Operation Management


1 Deploy cluster

1.1 Single Master mode

This mode is risky, upon broker restart or broken down, the whole service is unavailable. It's not recommended in production environment, it can be used for local test.

1)Start NameServer
### Start Name Server
$ nohup sh mqnamesrv &
 
### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)Start Broker
### start Broker
$ nohup sh bin/mqbroker -n localhost:9876 &

check whether Name Server is successfully started, eg: Broker’s IP is 192.168.1.2, Broker’s name is broker-a

$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a, 192.169.1.2:10911] boot success…

1.2 Multi Master mode

Cluster contains Master node only, no Slave node, eg: 2 Master nodes, 3 Master nodes, advantages and disadvantages of this mode are shown below:

  • advantages:simple configuration, single Master node broke down or restart do not impact application. Under RAID10 disk config, even if machine broken down and cannot recover, message do not get lost because of RAID10's high reliable(async flush to disk lost little message, sync to disk do not lost message), this mode get highest performance.

  • disadvantages:during the machine's down time, messages have not be consumed on this machine can not be subscribed before recovery. That will impacts message's instantaneity.

1)Start NameServer

NameServer should be started before broker. If under production environment, we recommend start 3 NameServer nodes for high available. Startup command is equal, as shown below:

### start Name Server
$ nohup sh mqnamesrv &
 
### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)start Broker cluster
### start the first Master on machine A, eg:NameServer's IP is :192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
 
### start the second Master on machine B, eg:NameServer's IP is :192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &


The above commands only used for single NameServer. In multi NameServer cluster, multi addresses concat by semicolon followed by -n in broker start command.

1.3 Multi Master Multi Slave mode - async replication

Each Master node is equipped with one Slave node, this mode has many Master-Slave group, using async replication for HA, slaver has a lag(ms level) behind master, advantages and disadvantages of this mode are shown below:

  • advantages: message lost a little, even if disk is broken; message instantaneity do not loss; Consumer can still consume from slave when master is down, this process is transparency to user, no human intervention is required; Performance is almost equal to Multi Master mode.

  • disadvantages: message lost a little data, when Master is down and disk broken.

1)Start NameServer
### start Name Server
$ nohup sh mqnamesrv &
 
### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)Start Broker cluster
### start first Master on machine A, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
 
### start second Master on machine B, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
 
### start first Slave on machine C, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
 
### start second Slave on machine D, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

1.4 Multi Master Multi Slave mode - synchronous double write

Each Master node is equipped with one Slave node, this mode has many Master-Slave group, using synchronous double write for HA, application's write operation is successful means both master and slave write successful, advantages and disadvantages of this mode are shown below:

  • advantages:both data and service have no single point failure, message has no lantancy even if Master is down, service available and data available is very high;

  • disadvantages:this mode's performance is 10% lower than async replication mode, sending latency is a little high, in the current version, it do not have auto Master-Slave switch when Master is down.

1)Start NameServer
### start Name Server
$ nohup sh mqnamesrv &
 
### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)Start Broker cluster
### start first Master on machine A, eg:NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
 
### start second Master on machine B, eg:NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
 
### start first Slave on machine C, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
 
### start second Slave on machine D, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

The above Broker matches Slave by specifying the same BrokerName, Master's BrokerId must be 0, Slave's BrokerId must larger than 0. Besides, a Master can have multi Slaves that each has a distinct BrokerId. $ROCKETMQ_HOME indicates RocketMQ's install directory, user needs to set this environment parameter.

2 mqadmin management tool

Attentions:

  1. execute command:./mqadmin {command} {args}
  2. almost all commands need -n indicates NameSerer address, format is ip:port
  3. almost all commands can get help info by -h
  4. if command contains both Broker address(-b) and cluster name(-c), it's prior to use broker address. If command do not contains broker address, it will executed on all hosts in this cluster. Support only one broker host. -b format is ip:port, default port is 10911
  5. there are many commands under tools, but not all command can be used, only commands that initialized in MQAdminStartup can be used, you can modify this class, add or self-define command.
  6. because of version update, little command do not update timely, please refer to source code directly when occur error.

2.1 Topic

namemeaningcommand itemsexplaination
updateTopiccreate or update Topic's config-bBroker address, means which Broker that topic is located, only support single Broker, address format is ip:port
-ccluster name, whic cluster that topic belongs to(query cluster info by clusterList)
-h-print help info
-nNameServer Service address, format is ip:port
-passign read write authority to new topic(W=2|R=4|WR=6)
-rthe count of queue that can be read(default is 8)
-wthe count of queue that can be wrote(default is 8)
-ttopic name(can only use characters ^[a-zA-Z0-9_-]+$ )
deleteTopicdelete Topic-ccluster name, which cluster that topic will be deleted belongs to(query cluster info by clusterList)
-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name(can only use characters ^[a-zA-Z0-9_-]+$ )
topicListquery Topic list info-hprint help info
-creturn topic list only if do not contains -c, if containis -c, it will return cluster name, topic name, consumer group name
-nNameServer Service address, format is ip:port
topicRoutequery Topic's route info-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
topicStatusquery Topic's offset-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
topicClusterListquery cluster list where Topic belongs to-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
updateTopicPermupdate Topic's produce and consume authority-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
-bBroker address which topic belongs to, support single broker only, format is ip:port
-passign read and write authority to the new topic(W=2|R=4|WR=6)
-ccluster name, which topic belongs to(query cluster info by clusterList), if do not have -b, execute comman an all brokers.
updateOrderConfcreate delete get specified namespace's kv config from NameServer, have not enabled at present-hprint help info
-nNameServer Service address, format is ip:port
-ttopic, key
-vorderConf, value
-mmethod, including get, put, delete
allocateMQcalculate consumer list rebalance result by average rebalance algorithm-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
-iipList, seperate by comma, calculate which topic queue that ips will load.
statsAllprint Topic's subscribe info, TPS, size of message blocked, count of read and write at last 24h, eg.-hprint help info
-nNameServer Service address, format is ip:port
-aonly print active topic or not
-tassign topic

2.2 Cluster

名称meaningcommand itemsexplaination
clusterListquery cluster info, including cluster, BrokerName, BrokerId, TPS, eg.-mprint more infos(eg: #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)
-hprint help info
-nNameServer Service address, format is ip:port
-iprint interval, unit second
clusterRTsend message to detect each cluster's Broker RT. Message will be sent to ${BrokerName} Topic。-aamount, count of detection, RT = sum time / amount
-ssize of message, unit B
-cwhich cluster will be detected
-pwhether print format log, splitted by |, default is not print
-hprint help info
-mwhich machine room it belongs to, just for print
-isend interval, unit second
-nNameServer Service address, format is ip:port

2.3 Broker

名称meaningcommand itemsexplaination
updateBrokerConfigupdate Broker's config file, it will modify Broker.conf-bBroker address, format is ip:port
-ccluster name
-kkey
-vvalue
-hprint help info
-nNameServer Service address, format is ip:port
brokerStatusget Broker's statistics info, running status(including whatever you want).-bBroker address, fomat isip:port
-hprint help info
-nNameServer Service address, format is ip:port
brokerConsumeStatsBroker's consumer info, including Consume Offset, Broker Offset, Diff, Timestamp that ordered by essage Queue-bBroker address, fomat isip:port
-trequest timeout time
-ldiff threshold, it will print when exceed this threshold.
-owhether is sequencial topic, generally false
-hprint help info
-nNameServer Service address, format is ip:port
getBrokerConfigget Broker's config-bBroker address, fomat isip:port
-nNameServer Service address, format is ip:port
wipeWritePermrevoke broker's write authority from NameServer.-bBroker address, fomat isip:port
-nNameServer Service address, format is ip:port
-hprint help info
cleanExpiredCQclean Broker's expired Consume Queue that maybe generated by decrease queue count.-nNameServer Service address, format is ip:port
-hprint help info
-bBroker address, fomat isip:port
-ccluster name
cleanUnusedTopicclean Broker's unused Topic that deleted mannually to release memory that Topic's Consume Queue occupied.-nNameServer Service address, format is ip:port
-hprint help info
-bBroker address, fomat isip:port
-ccluster name
sendMsgStatussend message to Broker, return send status and RT-nNameServer Service address, format is ip:port
-hprint help info
-bBrokerName, is different from broker address
-smessage size, unit B
-csend count

2.4 Message

名称meaningcommand itemsexplaination
queryMsgByIdquery message by offsetMsgId. If use opensource console, it should use offsetMsgId. Please refer to QueryMsgByIdSubCommand for detail.-imsgId
-hprint help info
-nNameServer Service address, format is ip:port
queryMsgByKeyquery message by Message's Key-kmsgKey
-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
queryMsgByOffsetquery message by Offset-bBroker name(it's not broker address, can query Broker name by clusterList).
-iquery queue id
-ooffset value
-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
queryMsgByUniqueKeyquery by msgId, msgId is different from offsetMsgId, please refer to Frequently asked questions about operations for details. Use -g and -d to let specified consumer return consume result.-hprint help info
-nNameServer Service address, format is ip:port
-iuniqe msg id
-gconsumerGroup
-dclientId
-ttopic name
checkMsgSendRTdetect RT of sending a message to a topic, similiar to clusterRT-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-adetection count
-ssize of the message
sendMessagesend a message, also can send to a specified Message Queue.-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-pbody, message entity
-kkeys
-ctags
-bBrokerName
-iqueueId
consumeMessageconsume message. Differert consume logic depends on offset, start & end timestamp, message queue, please refer to ConsumeMessageCommand for details.-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-bBrokerName
-ooffset that consumer start consume
-iqueueId
-gconsumer gropu
-stimestamp at start, refer to -h to get format开
-dtimestamp at the end
-csize of message that consumed
printMsgconsume and print messages from broker, support a time range-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-ccharset, eg: UTF-8
-ssubExpress, filter expression
-btimestap at start, refer to -h to get format
-etimestamp at the end
-dwhether print message entity or not
printMsgByQueuesimilar to printMsg, but it need specified Message Queue-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-iqueueId
-aBrokerName
-ccharset, eg: UTF-8
-ssubExpress, filter expression
-btimestamp at start, refer to -h to get format
-etimestamp at the end
-pwhether print message or not
-dwhether print message entity or not
-fwhether count and print tag or not
resetOffsetByTimereset offset by timestamp, Broker and consumer will all be reseted-hprint help info
-nNameServer Service address, format is ip:port
-gconsumer group
-ttopic name
-sreset offset corresponding to this timestamp
-fwhether enforce to reset or not, if set false, only can reset offset, if set true, it omit the relationship between timestamp and consumer offset.
-cwhether reset c++ sdk's offset or not

2.5 Consumer, Consumer Group

namemeaningcommand itemsexplaination
consumerProgressquery subscribe status, can get blocking counts of a concrete client ip.-gconsumer group name
-swhether print client IP or not
-hprint help info
-nNameServer Service address, format is ip:port
consumerStatusquery consumer status, including message blocking, and consumer's jstack result(please refer to ConsumerStatusSubCommand)-hprint help info
-nNameServer Service address, format is ip:port
-gconsumer group
-iclientId
-swhether execute jstack or not
updateSubGroupcreate or update subscribe info-nNameServer Service address, format is ip:port
-hprint help info
-bBroker address
-ccluster name
-gconsumer group name
-sconsumer group is allowed to consume or not
-mstart consume from minimal offset or not
-dbroadcast mode or not
-qcapacity of retry queue
-rmax retry count
-iIt works when slaveReadEnable enabled, and that not consumed from slave. Suggesting that consume from slave node by specify slave id.
-wIf broker consume from slave, whic slave node depends on this config that configed by BrokerId, eg: 1.
-awhether notify other consumers to rebalance or not when the count of consumer changes
deleteSubGroupdelete subscribe info from Broker-nNameServer Service address, format is ip:port
-hprint help info
-bBroker address
-ccluster name
-gconsumer group name
cloneGroupOffsetuse source group's offset at target group-nNameServer Service address, format is ip:port
-hprint help info
-ssource consumer group
-dtarget consumer group
-ttopic name
-onot used at present

2.6 Connection

namemeaningcommand itemsexplaination
consumerConnec tionquery Consumer's connection-gconsumer group name
-nNameServer Service address, format is ip:port
-hprint help info
producerConnec tionquery Producer's connection-gproducer group name
-ttopic name
-nNameServer Service address, format is ip:port
-hprint help info

2.7 NameServer

namemeaningcommand itemsexplaination
updateKvConfigupdate NameServer's kv config, not used at present-snamespace
-kkey
-vvalue
-nNameServer Service address, format is ip:port
-hprint help info
deleteKvConfigdelete NameServer's kv config-snamespace
-kkey
-nNameServer Service address, format is ip:port
-hprint help info
getNamesrvConfigget NameServer's config-nNameServer Service address, format is ip:port
-hprint help info
updateNamesrvConfigmodify NameServer's config-nNameServer Service address, format is ip:port
-hprint help info
-kkey
-vvalue

2.8 Other

namemeaningcommand itemsexplaination
startMonitoringStart the monitoring process, monitor message deletion and the number of retried messages in the queue-nNameServer Service address, format is ip:port
-hprint help info

3 Frequently asked questions about operations

3.1 RocketMQ's mqadmin command error

question description:execute mqadmin occur below exception after deploy RocketMQ cluster.

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to  failed

Solution: execute command export NAMESRV_ADDR=ip:9876 (ip is NameServer's ip address), then execute mqadmin commands.

3.2 RocketMQ consumer cannot consume, because of different version of producer and consumer.

question description: one producer produce message, consumer A can consume, consume B cannot consume, RocketMQ console print:

Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message。

Solution: make sure that producer and consumer has the same version of rocketmq-client.

3.3 Consumer cannot consume oldest message, when a new consumer group is added.

question description: when a new consumer group start, it consumes from current offset, do not fetch oldest message.

Solution: rocketmq's default policy is consume from latest, that is skip oldest message. If you want consume oldest message, you need to set org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere. The following is three common configurations:

  • default configuration, a new consumer group consume from latest position at first startup, then consume from last time's offset at next startup, that is skip oldest message;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  • a new consumer group consume from oldest postion at first startup, then consume from last time's offset at next startup, that is consume the unexpired message;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • a new consumer group consume from specified timestamp at first startup, then consume from last time's offset at next startup, cooperate with consumer.setConsumeTimestamp(), default is half an hour before;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);

3.4 How to enable consume from Slave

In some cases, consumer need reset offset to a day or two before, if Master Broker has limited memory, it's CommitLog will have a high IO load, then it will impact other message's read and write that on this broker. When slaveReadEnable=true is set, and consumer's offset exceeds accessMessageInMemoryMaxRatio=40%, Master Broker will recommend consumer consume from Slave Broker to lower Master Broker IO.

3.5 Performance tuning

A spin lock is recommended for asynchronous disk flush, a reentrant lock is recommended for synchronous disk flush, configuration item is useReentrantLockWhenPutMessage, default is false; Enable TransientStorePoolEnable is recommended when use asynchronous disk flush; Recommend to close transferMsgByHeap to improve fetch efficiency; Set a little larger sendMessageThreadPoolNums, when use synchronous disk flush.

3.6 The meaning and difference between msgId and offsetMsgId in RocketMQ

You will usually see the following log print message after sending message by using RocketMQ sdk.

SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000, offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
  • msgId, is generated by producer sdk. In particular, call method MessageClientIDSetter.createUniqIDBuffer() to generate unique Id;
  • offsetMsgId, offsetMsgId is generated by Broker server(format is "Ip Address + port + CommitLog offset"). offsetMsgId is messageId that is RocketMQ console's input.