rocketmq的NameServer模块

栏目: 后端 · 发布时间: 5年前

内容简介:[mq@dev1 bin]$ sh mqnamesrv -helpusage: mqnamesrv [-c <arg>] [-h] [-n <arg>] [-p]-c,–configFile <arg> Name server config properties file

[mq@dev1 bin]$ sh mqnamesrv -help

usage: mqnamesrv [-c ] [-h] [-n ] [-p]

-c,–configFile Name server config properties file

-h,–help Print help

-n,–namesrvAddr Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876

-p,–printConfigItem Print all config item

rmq的 namesrv服务的详细配置信息如下(采用下面命令, 可以打印全部配置信息)

sh mqnamesrv -p

rocketmqHome=/home/mq/tmp5/xb-rmq-server-4.5.1

kvConfigPath=/home/mq/namesrv/kvConfig.json

configStorePath=/home/mq/namesrv/namesrv.properties

productEnvName=center

clusterTest=false

orderMessageEnable=false

listenPort=9876

serverWorkerThreads=8

serverCallbackExecutorThreads=0

serverSelectorThreads=3

serverOnewaySemaphoreValue=256

serverAsyncSemaphoreValue=64

serverChannelMaxIdleTimeSeconds=120

serverSocketSndBufSize=65535

serverSocketRcvBufSize=65535

serverPooledByteBufAllocatorEnable=true

useEpollNativeSelector=false

部分信息的介绍如下(信息搜集于网络)

NamesrvConfig是NameServer的配置类

这个类放在了common模块中,但是几乎都是被NameServer模块引用,这里就当在NameServer模块中讲

主要讲解字段,方法都是get和set

字段

rocketmqHome

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));

RocketMQ home 目录

如果没有指定的话,默认值为系统环境变量ROCKETMQ_HOME

通过System.getenv获取,可以在~/.profile中export

或者可以在配置文件中指定rocketmqHome=***

kvConfigPath

private String kvConfigPath = System.getProperty(“user.home”) + File.separator + “namesrv” + File.separator + “kvConfig.json”;

KvConfig的配置文件路径

默认为System.getProperty(“user.home”)/namesrv/kvConfig.json

configStorePath

private String configStorePath = System.getProperty(“user.home”) + File.separator + “namesrv” + File.separator + “namesrv.properties”;

NamesrvConfig(以及NettyServerConfig)的配置文件路径

默认为System.getProperty(“user.home”)/namesrv/namesrv.properties

productEnvName

private String productEnvName = “center”;

clusterTest为true时work

用于构造ClusterTestRequestProcessor

clusterTest

private boolean clusterTest = false;

代表不同的模式

决定NamesrvController#registerProcessor()注册处理器时

用的是ClusterTestRequestProcessor 还是 DefaultRequestProcessor

orderMessageEnable

是否开启顺序消息功能

private boolean orderMessageEnable = false;

问题

productEnvName有什么用

clusterTest,orderMessageEnable是如何实现对应功能

refer

http://bboyjing.github.io/2017/04/21/RocketMQ%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B%E3%80%90rocketmq-namesrv%E3%80%91/

http://hiant.github.io/2016/08/26/rocketmq-0x5/ clusterTest作用

1.1. NamesrvStartup 处理流程

设置版本号

com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

com.alibaba.rocketmq.broker.processor.AdminBrokerProcessor#callConsumer

设置Socket缓冲区

如果没有设置 SEND 缓冲区:NettySystemConfig.SocketSndbufSize = 2048

如果没有设置 RECV 缓冲区:NettySystemConfig.SocketRcvbufSize = 2048

FastJson版本冲突检测

com.alibaba.rocketmq.common.conflict.PackageConflictDetect#detectFastjson

命令行参数解析

使用 com.alibaba.rocketmq.srvutil.ServerUtil#parseCmdLine 解析命令行,如果解析失败,则程序退出

配置文件加载

命令行中包含 -c 选项,则加载指定的 properties 配置文件,并通过 com.alibaba.rocketmq.common.MixAll#properties2Object 设置 NamesrvConfig、NettyServerConfig。其中,如果配置中没有设置 ListenPort,默认为 9876

输出配置参数列表

命令行中包含 -p 选项,则通过 com.alibaba.rocketmq.common.MixAll#printObjectProperties(org.slf4j.Logger, java.lang.Object) 输出 NamesrvConfig、NettyServerConfig 的声明字段,输出完成后程序退出

配置 NamesrvConfig

通过 com.alibaba.rocketmq.common.MixAll#properties2Object 将命令行参数加到 NamesrvConfig

配置 logback

logback 配置文件固定:namesrvConfig.getRocketmqHome() + “/conf/logback_namesrv.xml”。

注意:此前会先判断 namesrvConfig.getRocketmqHome() 未设置时,会导致程序退出。不过这种情况仅发生在直接运行

com.alibaba.rocketmq.namesrv.NamesrvStartup#main,如果是脚本启动,会自动将该值设置为 mqnamesrv 脚本的上上层目录

实例化并初始化 NamesrvController

使用 NamesrvConfig、NettyServerConfig 实例化 NamesrvController 然后调用 com.alibaba.rocketmq.namesrv.NamesrvController#initialize 初始化,初始化失败时程序退出

否则增加 ShutdownHook ,在程序退出时记录日志,并调用 com.alibaba.rocketmq.namesrv.NamesrvController#shutdown 释放资源

启动 NamesrvController

调用 com.alibaba.rocketmq.namesrv.NamesrvController#start 启动线程工作

1.2. NamesrvController 初始化流程

加载 KvConfig

如果 NamesrvConfig 中 kvConfigPath 路径指定的文件内容不为空,则将文件存储的 json 内容加载进Map:HashMap>

实例化 NettyRemotingServer

rocketmq-remoting 模块中的 com.alibaba.rocketmq.remoting.netty.NettyRemotingServer#NettyRemotingServer(com.alibaba.rocketmq.remoting.netty.NettyServerConfig, com.alibaba.rocketmq.remoting.ChannelEventListener)

构造 RemotingExecutorThread 线程池

使用 java.util.concurrent.Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory) 构造固定数目线程池,数目由 NettyServerConfig.getServerWorkerThreads 决定

注册

NamesrvConfig 中 clusterTest 为 true 时,调用 com.alibaba.rocketmq.remoting.RemotingServer#registerDefaultProcessor 将一个 com.alibaba.rocketmq.namesrv.processor.ClusterTestRequestProcessor 和 RemotingExecutorThread 线程池绑定起来;否则将一个 com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor 和 RemotingExecutorThread 线程池绑定起来。绑定到 NettyRemotingServer 的 defaultRequestProcessor 上

两者区别: ClusterTestRequestProcessor 继承自 DefaultRequestProcessor 并重写了 getRouteInfoByTopic 方法,RouteInfoManager 中未能获取到 TopicRouteData 时,去集群上查询一次

启动定时任务

5s 后启动,每隔 10s 执行一次:清理失效的 Broker 信息

1m 后启动,每隔 10m 执行一次:在日志中 INFO 级别输出 KvConfig 的内容

1.3. NamesrvController 启动流程

实际上是启动 NamesrvController 的 NettyRemotingServer

构造 DefaultEventExecutorGroup

线程数目由 NettyServerConfig.getServerWorkerThreads 决定,线程名:”NettyServerCodecThread_” + this.threadIndex.incrementAndGet()

构造 BOSS 线程

线程数固定为1,线程名:NettyBoss_1

构造 WORKER 线程

如果是 Linux,且 NettyServerConfig 中 useEpollNativeSelector 为 true,返回一个 EpollEventLoopGroup,线程数目由 NettyServerConfig.getServerSelectorThreads 决定,线程名:String.format(“NettyServerEPOLLSelector_%d_%d”, threadTotal, this.threadIndex.incrementAndGet());否则返回一个 NioEventLoopGroup,线程数目由 NettyServerConfig.getServerSelectorThreads 决定,线程名:String.format(“NettyServerNIOSelector_%d_%d”, threadTotal, this.threadIndex.incrementAndGet())

配置 ServerBootstrap

ChannelOption.SO_BACKLOG:1024

ChannelOption.SO_REUSEADDR:true

ChannelOption.SO_KEEPALIVE:false

ChannelOption.SO_SNDBUF:nettyServerConfig.getServerSocketSndBufSize()

ChannelOption.SO_RCVBUF:nettyServerConfig.getServerSocketRcvBufSize()

ChannelOption.TCP_NODELAY:true 设置的是childOption

localAddress:new InetSocketAddress(this.nettyServerConfig.getListenPort())

childHandler:

NettyEncoder:编码

NettyDecoder:解码

IdleStateHandler:空闲处理,空闲时间由 nettyServerConfig.getServerChannelMaxIdleTimeSeconds() 决定

NettyConnetManageHandler:连接管理,记录连接日志,并处理连接事件(空闲事件)

NettyServerHandler:业务逻辑处理。根据 RemotingCommand 的 type 调用 com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand 或 com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand

NettyServerConfig 未将 serverPooledByteBufAllocatorEnable 设置为 false,则 ChannelOption.ALLOCATOR 设置为 PooledByteBufAllocator.DEFAULT,即默认开启 Netty 的池化 ByteBuf

启动 ServerBootstrap

调用 io.netty.bootstrap.AbstractBootstrap#bind()

启动定时任务

3s 后启动,每隔 1s 执行一次:处理超时请求


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

游戏编程算法与技巧

游戏编程算法与技巧

【美】Sanjay Madhav / 刘瀚阳 / 电子工业出版社 / 2016-10 / 89

《游戏编程算法与技巧》介绍了大量今天在游戏行业中用到的算法与技术。《游戏编程算法与技巧》是为广大熟悉面向对象编程以及基础数据结构的游戏开发者所设计的。作者采用了一种独立于平台框架的方法来展示开发,包括2D 和3D 图形学、物理、人工智能、摄像机等多个方面的技术。《游戏编程算法与技巧》中内容几乎兼容所有游戏,无论这些游戏采用何种风格、开发语言和框架。 《游戏编程算法与技巧》的每个概念都是用C#......一起来看看 《游戏编程算法与技巧》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具