运维管理


1 集群搭建

1.1 单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

1)启动 NameServer
1
2
3
4
5
6
### 首先启动Name Server
$ nohup sh mqnamesrv &

### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)启动 Broker
1
2
3
4
5
### 启动Broker
$ nohup sh bin/mqbroker -n localhost:9876 &
### 验证Name Server 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...

1.2 多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

1)启动NameServer

NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:

1
2
3
4
5
6
### 首先启动Name Server
$ nohup sh mqnamesrv &

### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)启动Broker集群
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
...

````
如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中`-n`后面的地址列表用分号隔开即可,例如 `192.168.1.1:9876;192.161.2:9876`。

#### 1.3 多Master多Slave模式-异步复制
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;

- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。


##### 1)启动NameServer
```bash
### 首先启动Name Server
$ nohup sh mqnamesrv &

### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
````

##### 2)启动 Broker 集群

```bash
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &

### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &

### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

1.4 多 Master 多 Slave 模式-同步双写

每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

  • 缺点:性能比异步复制模式略低(大约低 10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

1)启动 NameServer
1
2
3
4
5
6
### 首先启动Name Server
$ nohup sh mqnamesrv &

### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)启动 Broker 集群
1
2
3
4
5
6
7
8
9
10
11
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &

### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &

### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上 Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。$ROCKETMQ_HOME 指的 RocketMQ 安装目录,需要用户自己设置此环境变量。

2 mqadmin 管理工具

注意:

  1. 执行命令方法:./mqadmin {command} {args}
  2. 几乎所有命令都需要配置-n 表示 NameServer 地址,格式为 ip:port
  3. 几乎所有命令都可以通过-h 获取帮助
  4. 如果既有 Broker 地址(-b)配置项又有 clusterName(-c)配置项,则优先以 Broker 地址执行命令,如果不配置 Broker 地址,则对集群中所有主机执行命令,只支持一个 Broker 地址。-b 格式为 ip:port,port 默认是 10911
  5. 在 tools 下可以看到很多命令,但并不是所有命令都能使用,只有在 MQAdminStartup 中初始化的命令才能使用,你也可以修改这个类,增加或自定义命令
  6. 由于版本更新问题,少部分命令可能未及时更新,遇到错误请直接阅读相关命令源码

2.1 Topic 相关

| 名称 | 含义 | 命令选项 | 说明 |
| —————- | ——————————————————————– | ——– | ———————————————————————————————————————– | — | —— |
| updateTopic | 创建更新 Topic 配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台 Broker,地址为 ip:port |
| | | -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) |
| | | -h- | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -p | 指定新 topic 的读写权限( W=2 | R=4 | WR=6 ) |
| | | -r | 可读队列数(默认为 8) |
| | | -w | 可写队列数(默认为 8) |
| | | -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) |
| deleteTopic | 删除 Topic | -c | cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) |
| topicList | 查看 Topic 列表信息 | -h | 打印帮助 |
| | | -c | 不配置-c 只返回 topic 列表,增加-c 返回 clusterName, topic, consumerGroup 信息,即 topic 的所属集群和订阅关系,没有参数 |
| | | -n | NameServer 服务地址,格式 ip:port |
| topicRoute | 查看 Topic 路由信息 | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| topicStatus | 查看 Topic 消息队列 offset | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -b | Broker 地址,表示 topic 所在 Broker,只支持单台 Broker,地址为 ip:port |
| | | -p | 指定新 topic 的读写权限( W=2 | R=4 | WR=6 ) |
| | | -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b 优先,如果没有-b,则对集群中所有 Broker 执行命令 |
| updateOrderConf | 从 NameServer 上创建、删除、获取特定命名空间的 kv 配置,目前还未启用 | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -t | topic,键 |
| | | -v | orderConf,值 |
| | | -m | method,可选 get、put、delete |
| allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -i | ipList,用逗号分隔,计算这些 ip 去负载 Topic 的消息队列 |
| statsAll | 打印 Topic 订阅关系、TPS、积累量、24h 读写总量等信息 | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -a | 是否只打印活跃 topic |
| | | -t | 指定 topic |

2.2 集群相关

| 名称 | 含义 | 命令选项 | 说明 |
| ———– | ———————————————————– | ——– | ——————————————————————————————– | —————- |
| clusterList | 查看集群信息,集群、BrokerName、BrokerId、TPS 等信息 | -m | 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -i | 打印间隔,单位秒 |
| clusterRT | 发送消息检测集群各 Broker RT。消息发往${BrokerName} Topic。 | -a | amount,每次探测的总数,RT = 总时间 / amount |
| | | -s | 消息大小,单位 B |
| | | -c | 探测哪个集群 |
| | | -p | 是否打印格式化日志,以 | 分割,默认不打印 |
| | | -h | 打印帮助 |
| | | -m | 所属机房,打印使用 |
| | | -i | 发送间隔,单位秒 |
| | | -n | NameServer 服务地址,格式 ip:port |

2.3 Broker 相关

名称含义命令选项说明
updateBrokerConfig更新 Broker 配置文件,会修改 Broker.conf-bBroker 地址,格式为 ip:port
-ccluster 名称
-kkey 值
-vvalue 值
-h打印帮助
-nNameServer 服务地址,格式 ip:port
brokerStatus查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面)-bBroker 地址,地址为 ip:port
-h打印帮助
-nNameServer 服务地址,格式 ip:port
brokerConsumeStatsBroker 中各个消费者的消费情况,按 Message Queue 维度返回 Consume Offset,Broker Offset,Diff,TImestamp 等信息-bBroker 地址,地址为 ip:port
-t请求超时时间
-ldiff 阈值,超过阈值才打印
-o是否为顺序 topic,一般为 false
-h打印帮助
-nNameServer 服务地址,格式 ip:port
getBrokerConfig获取 Broker 配置-bBroker 地址,地址为 ip:port
-nNameServer 服务地址,格式 ip:port
wipeWritePerm从 NameServer 上清除 Broker 写权限-bBrokerName
-nNameServer 服务地址,格式 ip:port
-h打印帮助
cleanExpiredCQ清理 Broker 上过期的 Consume Queue,如果手动减少对列数可能产生过期队列-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址,地址为 ip:port
-c集群名称
cleanUnusedTopic清理 Broker 上不使用的 Topic,从内存中释放 Topic 的 Consume Queue,如果手动删除 Topic 会产生不使用的 Topic-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址,地址为 ip:port
-c集群名称
sendMsgStatus向 Broker 发消息,返回发送状态和 RT-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBrokerName,注意不同于 Broker 地址
-s消息大小,单位 B
-c发送次数

2.4 消息相关

名称含义命令选项说明
queryMsgById根据 offsetMsgId 查询 msg,如果使用开源控制台,应使用 offsetMsgId,此命令还有其他参数,具体作用请阅读 QueryMsgByIdSubCommand。-imsgId
-h打印帮助
-nNameServer 服务地址,格式 ip:port
queryMsgByKey根据消息 Key 查询消息-kmsgKey
-tTopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
queryMsgByOffset根据 Offset 查询消息-bBroker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到)
-iquery 队列 id
-ooffset 值
-ttopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
queryMsgByUniqueKey根据 msgId 查询,msgId 不同于 offsetMsgId,区别详见常见运维问题。-g,-d 配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果-h打印帮助
-nNameServer 服务地址,格式 ip:port
-iuniqe msg id
-gconsumerGroup
-dclientId
-ttopic 名称
checkMsgSendRT检测向 topic 发消息的 RT,功能类似 clusterRT-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-a探测次数
-s消息大小
sendMessage发送一条消息,可以根据配置发往特定 Message Queue,或普通发送。-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-pbody,消息体
-kkeys
-ctags
-bBrokerName
-iqueueId
consumeMessage消费消息。可以根据 offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见 ConsumeMessageCommand。-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-bBrokerName
-o从 offset 开始消费
-iqueueId
-g消费者分组
-s开始时间戳,格式详见-h
-d结束时间戳
-c消费多少条消息
printMsg从 Broker 消费消息并打印,可选时间段-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-c字符集,例如 UTF-8
-ssubExpress,过滤表达式
-b开始时间戳,格式参见-h
-e结束时间戳
-d是否打印消息体
printMsgByQueue类似 printMsg,但指定 Message Queue-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-iqueueId
-aBrokerName
-c字符集,例如 UTF-8
-ssubExpress,过滤表达式
-b开始时间戳,格式参见-h
-e结束时间戳
-p是否打印消息
-d是否打印消息体
-f是否统计 tag 数量并打印
resetOffsetByTime按时间戳重置 offset,Broker 和 consumer 都会重置-h打印帮助
-nNameServer 服务地址,格式 ip:port
-g消费者分组
-ttopic 名称
-s重置为此时间戳对应的 offset
-f是否强制重置,如果 false,只支持回溯 offset,如果 true,不管时间戳对应 offset 与 consumeOffset 关系
-c是否重置 c++客户端 offset

2.5 消费者、消费组相关

名称含义命令选项说明
consumerProgress查看订阅组消费状态,可以查看具体的 client IP 的消息积累量-g消费者所属组名
-s是否打印 client IP
-h打印帮助
-nNameServer 服务地址,格式 ip:port
consumerStatus查看消费者状态,包括同一个分组中是否都是相同的订阅,分析 Process Queue 是否堆积,返回消费者 jstack 结果,内容较多,使用者参见 ConsumerStatusSubCommand-h打印帮助
-nNameServer 服务地址,格式 ip:port
-gconsumer group
-iclientId
-s是否执行 jstack
updateSubGroup更新或创建订阅关系-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址
-c集群名称
-g消费者分组名称
-s分组是否允许消费
-m是否从最小 offset 开始消费
-d是否是广播模式
-q重试队列数量
-r最大重试次数
-i当 slaveReadEnable 开启时有效,且还未达到从 slave 消费时建议从哪个 BrokerId 消费,可以配置备机 id,主动从备机消费
-w如果 Broker 建议从 slave 消费,配置决定从哪个 slave 消费,配置 BrokerId,例如 1
-a当消费者数量变化时是否通知其他消费者负载均衡
deleteSubGroup从 Broker 删除订阅关系-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址
-c集群名称
-g消费者分组名称
cloneGroupOffset在目标群组中使用源群组的 offset-nNameServer 服务地址,格式 ip:port
-h打印帮助
-s源消费者组
-d目标消费者组
-ttopic 名称
-o暂未使用

2.6 连接相关

名称含义命令选项说明
consumerConnection查询 Consumer 的网络连接-g消费者所属组名
-nNameServer 服务地址,格式 ip:port
-h打印帮助
producerConnection查询 Producer 的网络连接-g生产者所属组名
-t主题名称
-nNameServer 服务地址,格式 ip:port
-h打印帮助

2.7 NameServer 相关

名称含义命令选项说明
updateKvConfig更新 NameServer 的 kv 配置,目前还未使用-s命名空间
-kkey
-vvalue
-nNameServer 服务地址,格式 ip:port
-h打印帮助
deleteKvConfig删除 NameServer 的 kv 配置-s命名空间
-kkey
-nNameServer 服务地址,格式 ip:port
-h打印帮助
getNamesrvConfig获取 NameServer 配置-nNameServer 服务地址,格式 ip:port
-h打印帮助
updateNamesrvConfig修改 NameServer 配置-nNameServer 服务地址,格式 ip:port
-h打印帮助
-kkey
-vvalue

2.8 其他

名称含义命令选项说明
startMonitoring开启监控进程,监控消息误删、重试队列消息数等-nNameServer 服务地址,格式 ip:port
-h打印帮助

3 运维常见问题

3.1 RocketMQ 的 mqadmin 命令报错问题

问题描述:有时候在部署完 RocketMQ 集群后,尝试执行“mqadmin”一些运维命令,会出现下面的异常信息:

1
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed

解决方法:可以在部署 RocketMQ 集群的虚拟机上执行export NAMESRV_ADDR=ip:9876(ip 指的是集群中部署 NameServer 组件的机器 ip 地址)命令之后再使用“mqadmin”的相关命令进行查询,即可得到结果。

3.2 RocketMQ 生产端和消费端版本不一致导致不能正常消费的问题

问题描述:同一个生产端发出消息,A 消费端可消费,B 消费端却无法消费,rocketMQ Console 中出现:

1
Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message的异常消息。

解决方案:RocketMQ 的 jar 包:rocketmq-client 等包应该保持生产端,消费端使用相同的 version。

3.3 新增一个 topic 的消费组时,无法消费历史消息的问题

问题描述:当同一个 topic 的新增消费组启动时,消费的消息是当前的 offset 的消息,并未获取历史消息。

解决方案:rocketmq 默认策略是从消息队列尾部,即跳过历史消息。如果想消费历史消息,则需要设置:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere。常用的有以下三种配置:

  • 默认配置,一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费,即跳过历史消息;
1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  • 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费,即消费 Broker 未过期的历史消息;
1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,和 consumer.setConsumeTimestamp()配合使用,默认是半个小时以前;
1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);

3.4 如何开启从 Slave 读数据功能

在某些情况下,Consumer 需要将消费位点重置到 1-2 天前,这时在内存有限的 Master Broker 上,CommitLog 会承载比较重的 IO 压力,影响到该 Broker 的其它消息的读与写。可以开启slaveReadEnable=true,当 Master Broker 发现 Consumer 的消费位点与 CommitLog 的最新值的差值的容量超过该机器内存的百分比(accessMessageInMemoryMaxRatio=40%),会推荐 Consumer 从 Slave Broker 中去读取数据,降低 Master Broker 的 IO。

3.5 性能调优问题

异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁,调整 Broker 配置项useReentrantLockWhenPutMessage,默认为 false;异步刷盘建议开启TransientStorePoolEnable;建议关闭 transferMsgByHeap,提高拉消息效率;同步刷盘建议适当增大sendMessageThreadPoolNums,具体配置需要经过压测。

3.6 在 RocketMQ 中 msgId 和 offsetMsgId 的含义与区别

使用 RocketMQ 完成生产者客户端消息发送后,通常会看到如下日志打印信息:

1
SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000, offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
  • msgId,对于客户端来说 msgId 是由客户端 producer 实例端生成的,具体来说,调用方法MessageClientIDSetter.createUniqIDBuffer()生成唯一的 Id;
  • offsetMsgId,offsetMsgId 是由 Broker 服务端在写入消息时生成的(采用”IP 地址+Port 端口”与“CommitLog 的物理偏移量地址”做了一个字符串拼接),其中 offsetMsgId 就是在 RocketMQ 控制台直接输入查询的那个 messageId。