0%

介绍

  NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

阅读全文 »


1 Deploy cluster

1.1 Single Master mode

This mode is risky, upon broker restart or broken down, the whole service is unavailable. It's not recommended in production environment, it can be used for local test.

1)Start NameServer
### Start Name Server
$ nohup sh mqnamesrv &
 
### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)Start Broker
### start Broker
$ nohup sh bin/mqbroker -n localhost:9876 &

check whether Name Server is successfully started, eg: Broker’s IP is 192.168.1.2, Broker’s name is broker-a

$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a, 192.169.1.2:10911] boot success…

1.2 Multi Master mode

Cluster contains Master node only, no Slave node, eg: 2 Master nodes, 3 Master nodes, advantages and disadvantages of this mode are shown below:

  • advantages:simple configuration, single Master node broke down or restart do not impact application. Under RAID10 disk config, even if machine broken down and cannot recover, message do not get lost because of RAID10's high reliable(async flush to disk lost little message, sync to disk do not lost message), this mode get highest performance.

  • disadvantages:during the machine's down time, messages have not be consumed on this machine can not be subscribed before recovery. That will impacts message's instantaneity.

1)Start NameServer

NameServer should be started before broker. If under production environment, we recommend start 3 NameServer nodes for high available. Startup command is equal, as shown below:

### start Name Server
$ nohup sh mqnamesrv &
 
### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)start Broker cluster
### start the first Master on machine A, eg:NameServer's IP is :192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
 
### start the second Master on machine B, eg:NameServer's IP is :192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &


The above commands only used for single NameServer. In multi NameServer cluster, multi addresses concat by semicolon followed by -n in broker start command.

1.3 Multi Master Multi Slave mode - async replication

Each Master node is equipped with one Slave node, this mode has many Master-Slave group, using async replication for HA, slaver has a lag(ms level) behind master, advantages and disadvantages of this mode are shown below:

  • advantages: message lost a little, even if disk is broken; message instantaneity do not loss; Consumer can still consume from slave when master is down, this process is transparency to user, no human intervention is required; Performance is almost equal to Multi Master mode.

  • disadvantages: message lost a little data, when Master is down and disk broken.

1)Start NameServer
### start Name Server
$ nohup sh mqnamesrv &
 
### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)Start Broker cluster
### start first Master on machine A, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
 
### start second Master on machine B, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
 
### start first Slave on machine C, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
 
### start second Slave on machine D, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

1.4 Multi Master Multi Slave mode - synchronous double write

Each Master node is equipped with one Slave node, this mode has many Master-Slave group, using synchronous double write for HA, application's write operation is successful means both master and slave write successful, advantages and disadvantages of this mode are shown below:

  • advantages:both data and service have no single point failure, message has no lantancy even if Master is down, service available and data available is very high;

  • disadvantages:this mode's performance is 10% lower than async replication mode, sending latency is a little high, in the current version, it do not have auto Master-Slave switch when Master is down.

1)Start NameServer
### start Name Server
$ nohup sh mqnamesrv &
 
### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)Start Broker cluster
### start first Master on machine A, eg:NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
 
### start second Master on machine B, eg:NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
 
### start first Slave on machine C, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
 
### start second Slave on machine D, eg: NameServer's IP is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

The above Broker matches Slave by specifying the same BrokerName, Master's BrokerId must be 0, Slave's BrokerId must larger than 0. Besides, a Master can have multi Slaves that each has a distinct BrokerId. $ROCKETMQ_HOME indicates RocketMQ's install directory, user needs to set this environment parameter.

2 mqadmin management tool

Attentions:

  1. execute command:./mqadmin {command} {args}
  2. almost all commands need -n indicates NameSerer address, format is ip:port
  3. almost all commands can get help info by -h
  4. if command contains both Broker address(-b) and cluster name(-c), it's prior to use broker address. If command do not contains broker address, it will executed on all hosts in this cluster. Support only one broker host. -b format is ip:port, default port is 10911
  5. there are many commands under tools, but not all command can be used, only commands that initialized in MQAdminStartup can be used, you can modify this class, add or self-define command.
  6. because of version update, little command do not update timely, please refer to source code directly when occur error.

2.1 Topic

namemeaningcommand itemsexplaination
updateTopiccreate or update Topic's config-bBroker address, means which Broker that topic is located, only support single Broker, address format is ip:port
-ccluster name, whic cluster that topic belongs to(query cluster info by clusterList)
-h-print help info
-nNameServer Service address, format is ip:port
-passign read write authority to new topic(W=2|R=4|WR=6)
-rthe count of queue that can be read(default is 8)
-wthe count of queue that can be wrote(default is 8)
-ttopic name(can only use characters ^[a-zA-Z0-9_-]+$ )
deleteTopicdelete Topic-ccluster name, which cluster that topic will be deleted belongs to(query cluster info by clusterList)
-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name(can only use characters ^[a-zA-Z0-9_-]+$ )
topicListquery Topic list info-hprint help info
-creturn topic list only if do not contains -c, if containis -c, it will return cluster name, topic name, consumer group name
-nNameServer Service address, format is ip:port
topicRoutequery Topic's route info-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
topicStatusquery Topic's offset-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
topicClusterListquery cluster list where Topic belongs to-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
updateTopicPermupdate Topic's produce and consume authority-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
-bBroker address which topic belongs to, support single broker only, format is ip:port
-passign read and write authority to the new topic(W=2|R=4|WR=6)
-ccluster name, which topic belongs to(query cluster info by clusterList), if do not have -b, execute comman an all brokers.
updateOrderConfcreate delete get specified namespace's kv config from NameServer, have not enabled at present-hprint help info
-nNameServer Service address, format is ip:port
-ttopic, key
-vorderConf, value
-mmethod, including get, put, delete
allocateMQcalculate consumer list rebalance result by average rebalance algorithm-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
-iipList, seperate by comma, calculate which topic queue that ips will load.
statsAllprint Topic's subscribe info, TPS, size of message blocked, count of read and write at last 24h, eg.-hprint help info
-nNameServer Service address, format is ip:port
-aonly print active topic or not
-tassign topic

2.2 Cluster

名称meaningcommand itemsexplaination
clusterListquery cluster info, including cluster, BrokerName, BrokerId, TPS, eg.-mprint more infos(eg: #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)
-hprint help info
-nNameServer Service address, format is ip:port
-iprint interval, unit second
clusterRTsend message to detect each cluster's Broker RT. Message will be sent to ${BrokerName} Topic。-aamount, count of detection, RT = sum time / amount
-ssize of message, unit B
-cwhich cluster will be detected
-pwhether print format log, splitted by |, default is not print
-hprint help info
-mwhich machine room it belongs to, just for print
-isend interval, unit second
-nNameServer Service address, format is ip:port

2.3 Broker

名称meaningcommand itemsexplaination
updateBrokerConfigupdate Broker's config file, it will modify Broker.conf-bBroker address, format is ip:port
-ccluster name
-kkey
-vvalue
-hprint help info
-nNameServer Service address, format is ip:port
brokerStatusget Broker's statistics info, running status(including whatever you want).-bBroker address, fomat isip:port
-hprint help info
-nNameServer Service address, format is ip:port
brokerConsumeStatsBroker's consumer info, including Consume Offset, Broker Offset, Diff, Timestamp that ordered by essage Queue-bBroker address, fomat isip:port
-trequest timeout time
-ldiff threshold, it will print when exceed this threshold.
-owhether is sequencial topic, generally false
-hprint help info
-nNameServer Service address, format is ip:port
getBrokerConfigget Broker's config-bBroker address, fomat isip:port
-nNameServer Service address, format is ip:port
wipeWritePermrevoke broker's write authority from NameServer.-bBroker address, fomat isip:port
-nNameServer Service address, format is ip:port
-hprint help info
cleanExpiredCQclean Broker's expired Consume Queue that maybe generated by decrease queue count.-nNameServer Service address, format is ip:port
-hprint help info
-bBroker address, fomat isip:port
-ccluster name
cleanUnusedTopicclean Broker's unused Topic that deleted mannually to release memory that Topic's Consume Queue occupied.-nNameServer Service address, format is ip:port
-hprint help info
-bBroker address, fomat isip:port
-ccluster name
sendMsgStatussend message to Broker, return send status and RT-nNameServer Service address, format is ip:port
-hprint help info
-bBrokerName, is different from broker address
-smessage size, unit B
-csend count

2.4 Message

名称meaningcommand itemsexplaination
queryMsgByIdquery message by offsetMsgId. If use opensource console, it should use offsetMsgId. Please refer to QueryMsgByIdSubCommand for detail.-imsgId
-hprint help info
-nNameServer Service address, format is ip:port
queryMsgByKeyquery message by Message's Key-kmsgKey
-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
queryMsgByOffsetquery message by Offset-bBroker name(it's not broker address, can query Broker name by clusterList).
-iquery queue id
-ooffset value
-ttopic name
-hprint help info
-nNameServer Service address, format is ip:port
queryMsgByUniqueKeyquery by msgId, msgId is different from offsetMsgId, please refer to Frequently asked questions about operations for details. Use -g and -d to let specified consumer return consume result.-hprint help info
-nNameServer Service address, format is ip:port
-iuniqe msg id
-gconsumerGroup
-dclientId
-ttopic name
checkMsgSendRTdetect RT of sending a message to a topic, similiar to clusterRT-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-adetection count
-ssize of the message
sendMessagesend a message, also can send to a specified Message Queue.-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-pbody, message entity
-kkeys
-ctags
-bBrokerName
-iqueueId
consumeMessageconsume message. Differert consume logic depends on offset, start & end timestamp, message queue, please refer to ConsumeMessageCommand for details.-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-bBrokerName
-ooffset that consumer start consume
-iqueueId
-gconsumer gropu
-stimestamp at start, refer to -h to get format开
-dtimestamp at the end
-csize of message that consumed
printMsgconsume and print messages from broker, support a time range-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-ccharset, eg: UTF-8
-ssubExpress, filter expression
-btimestap at start, refer to -h to get format
-etimestamp at the end
-dwhether print message entity or not
printMsgByQueuesimilar to printMsg, but it need specified Message Queue-hprint help info
-nNameServer Service address, format is ip:port
-ttopic name
-iqueueId
-aBrokerName
-ccharset, eg: UTF-8
-ssubExpress, filter expression
-btimestamp at start, refer to -h to get format
-etimestamp at the end
-pwhether print message or not
-dwhether print message entity or not
-fwhether count and print tag or not
resetOffsetByTimereset offset by timestamp, Broker and consumer will all be reseted-hprint help info
-nNameServer Service address, format is ip:port
-gconsumer group
-ttopic name
-sreset offset corresponding to this timestamp
-fwhether enforce to reset or not, if set false, only can reset offset, if set true, it omit the relationship between timestamp and consumer offset.
-cwhether reset c++ sdk's offset or not

2.5 Consumer, Consumer Group

namemeaningcommand itemsexplaination
consumerProgressquery subscribe status, can get blocking counts of a concrete client ip.-gconsumer group name
-swhether print client IP or not
-hprint help info
-nNameServer Service address, format is ip:port
consumerStatusquery consumer status, including message blocking, and consumer's jstack result(please refer to ConsumerStatusSubCommand)-hprint help info
-nNameServer Service address, format is ip:port
-gconsumer group
-iclientId
-swhether execute jstack or not
updateSubGroupcreate or update subscribe info-nNameServer Service address, format is ip:port
-hprint help info
-bBroker address
-ccluster name
-gconsumer group name
-sconsumer group is allowed to consume or not
-mstart consume from minimal offset or not
-dbroadcast mode or not
-qcapacity of retry queue
-rmax retry count
-iIt works when slaveReadEnable enabled, and that not consumed from slave. Suggesting that consume from slave node by specify slave id.
-wIf broker consume from slave, whic slave node depends on this config that configed by BrokerId, eg: 1.
-awhether notify other consumers to rebalance or not when the count of consumer changes
deleteSubGroupdelete subscribe info from Broker-nNameServer Service address, format is ip:port
-hprint help info
-bBroker address
-ccluster name
-gconsumer group name
cloneGroupOffsetuse source group's offset at target group-nNameServer Service address, format is ip:port
-hprint help info
-ssource consumer group
-dtarget consumer group
-ttopic name
-onot used at present

2.6 Connection

namemeaningcommand itemsexplaination
consumerConnec tionquery Consumer's connection-gconsumer group name
-nNameServer Service address, format is ip:port
-hprint help info
producerConnec tionquery Producer's connection-gproducer group name
-ttopic name
-nNameServer Service address, format is ip:port
-hprint help info

2.7 NameServer

namemeaningcommand itemsexplaination
updateKvConfigupdate NameServer's kv config, not used at present-snamespace
-kkey
-vvalue
-nNameServer Service address, format is ip:port
-hprint help info
deleteKvConfigdelete NameServer's kv config-snamespace
-kkey
-nNameServer Service address, format is ip:port
-hprint help info
getNamesrvConfigget NameServer's config-nNameServer Service address, format is ip:port
-hprint help info
updateNamesrvConfigmodify NameServer's config-nNameServer Service address, format is ip:port
-hprint help info
-kkey
-vvalue

2.8 Other

namemeaningcommand itemsexplaination
startMonitoringStart the monitoring process, monitor message deletion and the number of retried messages in the queue-nNameServer Service address, format is ip:port
-hprint help info

3 Frequently asked questions about operations

3.1 RocketMQ's mqadmin command error

question description:execute mqadmin occur below exception after deploy RocketMQ cluster.

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to  failed

Solution: execute command export NAMESRV_ADDR=ip:9876 (ip is NameServer's ip address), then execute mqadmin commands.

3.2 RocketMQ consumer cannot consume, because of different version of producer and consumer.

question description: one producer produce message, consumer A can consume, consume B cannot consume, RocketMQ console print:

Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message。

Solution: make sure that producer and consumer has the same version of rocketmq-client.

3.3 Consumer cannot consume oldest message, when a new consumer group is added.

question description: when a new consumer group start, it consumes from current offset, do not fetch oldest message.

Solution: rocketmq's default policy is consume from latest, that is skip oldest message. If you want consume oldest message, you need to set org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere. The following is three common configurations:

  • default configuration, a new consumer group consume from latest position at first startup, then consume from last time's offset at next startup, that is skip oldest message;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  • a new consumer group consume from oldest postion at first startup, then consume from last time's offset at next startup, that is consume the unexpired message;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • a new consumer group consume from specified timestamp at first startup, then consume from last time's offset at next startup, cooperate with consumer.setConsumeTimestamp(), default is half an hour before;
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);

3.4 How to enable consume from Slave

In some cases, consumer need reset offset to a day or two before, if Master Broker has limited memory, it's CommitLog will have a high IO load, then it will impact other message's read and write that on this broker. When slaveReadEnable=true is set, and consumer's offset exceeds accessMessageInMemoryMaxRatio=40%, Master Broker will recommend consumer consume from Slave Broker to lower Master Broker IO.

3.5 Performance tuning

A spin lock is recommended for asynchronous disk flush, a reentrant lock is recommended for synchronous disk flush, configuration item is useReentrantLockWhenPutMessage, default is false; Enable TransientStorePoolEnable is recommended when use asynchronous disk flush; Recommend to close transferMsgByHeap to improve fetch efficiency; Set a little larger sendMessageThreadPoolNums, when use synchronous disk flush.

3.6 The meaning and difference between msgId and offsetMsgId in RocketMQ

You will usually see the following log print message after sending message by using RocketMQ sdk.

SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000, offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
  • msgId, is generated by producer sdk. In particular, call method MessageClientIDSetter.createUniqIDBuffer() to generate unique Id;
  • offsetMsgId, offsetMsgId is generated by Broker server(format is "Ip Address + port + CommitLog offset"). offsetMsgId is messageId that is RocketMQ console's input.


preface

This document is mainly introduced for how to build and deploy auto failover RocketMQ cluster based on DLedger.

For detailed new cluster deployment and old cluster upgrade document, please refer to Deployment Guide

1. Build from source code

Build phase contains two parts, first, build DLedger, then build RocketMQ.

1.1 Build DLedger

git clone https://github.com/openmessaging/openmessaging-storage-dledger.git

cd openmessaging-storage-dledger

mvn clean install -DskipTests

1.2 Build RocketMQ

git clone https://github.com/apache/rocketmq.git

cd rocketmq

git checkout -b store_with_dledger origin/store_with_dledger

mvn -Prelease-all -DskipTests clean install -U

2. Quick Deployment

after build successful

cd distribution/target/apache-rocketmq

sh bin/dledger/fast-try.sh start

if the above commands executed successfully, then check cluster status by using mqadmin operation commands.

sh bin/mqadmin clusterList -n 127.0.0.1:9876

If everything goes well, the following content will appear:

ClusterList

(BID is 0 indicate Master,the others are Follower)

After startup successful, producer can produce message, and then test failover scenario.

Stop cluster fastly, execute the following command:

sh bin/dledger/fast-try.sh stop

Quick deployment, default configuration is in directory conf/dledger,default storage path is /tmp/rmqstore.

3. Failover

After successful deployment, kill Leader process(as the above example, kill process that binds port 30931), about 10 seconds elapses, use clusterList command check cluster's status, Leader switch to another node.


preface

This document introduces how to deploy auto failover RocketMQ-on-DLedger Group。

RocketMQ-on-DLedger Group is a broker group with same name, needs at least 3 nodes, elect a Leader by Raft algorithm automatically, the others as Follower, replicating data between Leader and Follower for system high available.

RocketMQ-on-DLedger Group can failover automatically, and maintains consistent.

RocketMQ-on-DLedger Group can scale up horizontal, that is, can deploy any RocketMQ-on-DLedger Groups providing services external.

1. New cluster deployment

1.1 Write the configuration

each RocketMQ-on-DLedger Group needs at least 3 machines.(assuming 3 in this document)

write 3 configuration files, advising refer to the directory of conf/dledger ‘s example configuration file.

key configuration items:

namemeaningexample
enableDLegerCommitLogwhether enable DLedgertrue
dLegerGroupDLedger Raft Group's name, advising maintain consistent to brokerNameRaftNode00
dLegerPeersDLedger Group's nodes port infos, each node's configuration stay consistent in the same group.n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfIdnode id, must belongs to dLegerPeers; each node is unique in the same group.n0
sendMessageThreadPoolNumsthe count of sending thread, advising set equal to the cpu cores.16

the following presents an example configuration conf/dledger/broker-n0.conf.

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16

1.2 Start Broker

Startup stays consistent with the old version.

nohup sh bin/mqbroker -c conf/dledger/xxx-n0.conf &

nohup sh bin/mqbroker -c conf/dledger/xxx-n1.conf &

nohup sh bin/mqbroker -c conf/dledger/xxx-n2.conf &

2. Upgrade old cluster

If old cluster deployed in Master mode, then each Master needs to be transformed into a RocketMQ-on-DLedger Group.

If old cluster deployed in Master-Slave mode, then each Master-Slave group needs to be transformed into a RocketMQ-on-DLedger Group.

2.1 Kill old Broker

execute kill command, or call bin/mqshutdown broker.

2.2 Check old Commitlog

Each node in RocketMQ-on-DLedger group is compatible with old Commitlog, but Raft replicating process works on the adding message only. So, to avoid occurring exceptions, old Commitlog must be consistent.

If old cluster deployed in Master-Slave mode, it maybe inconsistent after shutdown. Advising use md5sum to check at least 2 recently Commitlog file, if occur inconsistent, maintain consistent by copy.

Although RocketMQ-on-DLedger Group can deployed with 2 nodes, it lacks failover ability(at least 3 nodes can tolerate one node fail).

Make sure that both Master and Slave’s Commitlog is consistent, then prepare 3 machines, copy old Commitlog from Master to this 3 machines(BTW, copy the config directory).

Then, go ahead to set configurations.

2.3 Modify configuration

Refer to New cluster deployment.

2.4 Restart Broker

Refer to New cluster deployment.

Design

1 Message Store

1.1 The Architecure of Message Store

1.2 PageCache and Memory-Map(Mmap)

1.3 Message Flush

2 Communication Mechanism

2.1 The class diagram of Remoting module

2.2 The design of protocol and encode/decode

2.3 The three ways and process of message communication

2.4 The multi-thread design of Reactor

3 Message Filter

4 LoadBalancing

4.1 The loadBalance of Producer

4.2 The loadBalance of Consumer

5 Transactional Message

Apache RocketMQ supports distributed transactional message from version 4.3.0. RocketMQ implements transactional message by using the protocol of 2PC(two-phase commit), in addition adding a compensation logic to handle timeout-case or failure-case of commit-phase, as shown below.

5.1 The Process of RocketMQ Transactional Message

The picture above shows the overall architecture of transactional message, including the sending of message(commit-request phase), the sending of commit/rollback(commit phase) and the compensation process.

  1. The sending of message and Commit/Rollback.

    (1) Sending the message(named Half message in RocketMQ)

    (2) The server responds the writing result(success or failure) of Half message.

    (3) Handle local transaction according to the result(local transaction won’t be executed when the result is failure).

    (4) Sending Commit/Rollback to broker according to the result of local transaction(Commit will generate message index and make the message visible to consumers).

  2. Compensation process

    (1) For a transactional message without a Commit/Rollback (means the message in the pending status), a “back-check” request is initiated from the broker.

    (2) The Producer receives the “back-check” request and checks the status of the local transaction corresponding to the “back-check” message.

    (3) Redo Commit or Rollback based on local transaction status.

    The compensation phase is used to resolve the timeout or failure case of the message Commit or Rollback.

5.2 The design of RocketMQ Transactional Message

  1. Transactional message is invisible to users in first phase(commit-request phase)

Upon on the main process of transactional message, the message of first phase is invisible to the user. This is also the biggest difference from normal message. So how do we write the message while making it invisible to the user? And below is the solution of RocketMQ: if the message is a Half message, the topic and queueId of the original message will be backed up, and then changes the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group does not subscribe to the topic, the consumer cannot consume the Half message. Then RocketMQ starts a timing task, pulls the message for RMQ_SYS_TRANS_HALF_TOPIC, obtains a channel according to producer group and sends a back-check to query local transaction status, and decide whether to submit or roll back the message according to the status.

In RocketMQ, the storage structure of the message in the broker is as follows. Each message has corresponding index information. The Consumer reads the content of the message through the secondary index of the ConsumeQueue. The flow is as follows:

The specific implementation strategy of RocketMQ is: if the transactional message is written, topic and queueId of the message are replaced, and the original topic and queueId are stored in the properties of the message. Because the replace of the topic, the message will not be forwarded to the Consumer Queue of the original topic, and the consumer cannot perceive the existence of the message and will not consume it. In fact, changing the topic is the conventional method of RocketMQ(just recall the implementation mechanism of the delay message).

  1. Commit/Rollback operation and introduction of Op message

After finishing writing a message that is invisible to the user in the first phase, here comes two cases in the second phase. One is Commit operation, after which the message needs to be visible to the user; the other one is Rollback operation, after which the first phase message(Half message) needs to be revoked. For the case of Rollback, since first-phase message itself is invisible to the user, there is no need to actually revoke the message (in fact, RocketMQ can't actually delete a message because it is a sequential-write file). But still some operation needs to be done to identity the final status of the message, to differ it from pending status message. To do this, the concept of "Op message" is introduced, which means the message has a certain status(Commit or Rollback). If a transactional message does not have a corresponding Op message, the status of the transaction is still undetermined (probably the second-phase failed). By introducing the Op message, the RocketMQ records an Op message for every Half message regardless it is Commit or Rollback. The only difference between Commit and Rollback is that when it comes to Commit, the index of the Half message is created before the Op message is written.

  1. How Op message stored and the correspondence between Op message and Half message

RocketMQ writes the Op message to a specific system topic(RMQ_SYS_TRANS_OP_HALF_TOPIC) which will be created via the method - TransactionalMessageUtil.buildOpTopic(); this topic is an internal Topic (like the topic of RMQ_SYS_TRANS_HALF_TOPIC) and will not be consumed by the user. The content of the Op message is the physical offset of the corresponding Half message. Through the Op message we can index to the Half message for subsequent check-back operation.

  1. Index construction of Half messages

When performing Commit operation of the second phase, the index of the Half message needs to be built. Since the Half message is written to a special topic(RMQ_SYS_TRANS_HALF_TOPIC) in the first phase of 2PC, so it needs to be read out from the special topic when building index, and replace the topic and queueId with the real target topic and queueId, and then write through a normal message that is visible to the user. Therefore, in conclusion, the second phase recovers a complete normal message using the content of the Half message stored in the first phase, and then goes through the message-writing process.

  1. How to handle the message failed in the second phase?

If commit/rollback phase fails, for example, a network problem causes the Commit to fail when you do Commit. Then certain strategy is required to make sure the message finally commit. RocketMQ uses a compensation mechanism called "back-check". The broker initiates a back-check request for the message in pending status, and sends the request to the corresponding producer side (the same producer group as the producer group who sent the Half message). The producer checks the status of local transaction and redo Commit or Rollback. The broker performs the back-check by comparing the RMQ_SYS_TRANS_HALF_TOPIC messages and the RMQ_SYS_TRANS_OP_HALF_TOPIC messages and advances the checkpoint(recording those transactional messages that the status are certain).

RocketMQ does not back-check the status of transactional messages endlessly. The default time is 15. If the transaction status is still unknown after 15 times, RocketMQ will roll back the message by default.

6 Message Query

6.1 Query messages by messageId

6.2 Query messages by message key

1 Producer

2 Consumer

3 Broker

3.1 Broker Role

3.2 FlushDiskType

3.3 Broker Configuration

Parameter nameDefaultDescription
listenPort10911listen port for client
namesrvAddrnullname server address
brokerIP1InetAddress for network interfaceShould be configured if having multiple addresses
brokerIP2InetAddress for network interfaceIf configured for the Master broker in the Master/Slave cluster, slave broker will connect to this port for data synchronization
brokerNamenullbroker name
brokerClusterNameDefaultClusterthis broker belongs to which cluster
brokerId0broker id, 0 means master, positive integers mean slave
storePathCommitLog$HOME/store/commitlog/file path for commit log
storePathConsumerQueue$HOME/store/consumequeue/file path for consume queue
mappedFileSizeCommitLog1024 1024 1024(1G)mapped file size for commit log
deleteWhen04When to delete the commitlog which is out of the reserve time
fileReserverdTime72The number of hours to keep a commitlog before deleting it
brokerRoleASYNC_MASTERSYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskTypeASYNC_FLUSH{SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance.

Overview

This document focuses on how to quickly deploy and use a RocketMQ cluster that supports the privilege control feature.

1. Access control features

Access Control (ACL) mainly provides Topic resource level user access control for RocketMQ.If you want to enable RocketMQ permission control, you can inject the AccessKey and SecretKey signatures through the RPCHook on the Client side.And then, the corresponding permission control attributes (including Topic access rights, IP whitelist and AccessKey and SecretKey signature) are set in the configuration file of distribution/conf/plain_acl.yml.The Broker side will check the permissions owned by the AccessKey, and if the verification fails, an exception is thrown;

The source code about ACL on the Client side can be find in org.apache.rocketmq.example.simple.AclClient.java

2. Access control definition and attribute values

2.1 Access control definition

The definition of Topic resource access control for RocketMQ is mainly as shown in the following table.

Permissionexplanation
DENYpermission deny
ANYPUB or SUB permission
PUBPublishing permission
SUBSubscription permission

2.2 Main properties

keyvalueexplanation
globalWhiteRemoteAddressesstringGlobal IP whitelist,example: *; 192.168.*.*; 192.168.0.1
accessKeystringAccess Key
secretKeystringSecret Key
whiteRemoteAddressstringUser IP whitelist,example: *; 192.168.*.*; 192.168.0.1
admintrue;falseWhether an administrator account
defaultTopicPermDENY;PUB;SUB;PUB|SUBDefault Topic permission
defaultGroupPermDENY;PUB;SUB;PUB|SUBDefault ConsumerGroup permission
topicPermstopic=permissionTopic only permission
groupPermsgroup=permissionConsumerGroup only permission

For details, please refer to the distribution/conf/plain_acl.yml configuration file.

3. Cluster deployment with permission control

After defining the permission attribute in the distribution/conf/plain_acl.yml configuration file as described above, open the aclEnable switch variable to enable the ACL feature of the RocketMQ cluster.The configuration file of the ACL feature enabled on the broker is as follows:

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if acl is open,the flag will be true
aclEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876

4. Main process of access control

The main ACL process is divided into two parts, including privilege resolution and privilege check.

4.1 Privilege resolution

The Broker side parses the client's RequestCommand request and obtains the attribute field that needs to be authenticated.

main attributes:

(1) AccessKey:Similar to the user name, on behalf of the user entity, the permission data corresponds to it;

(2) Signature:The client obtains the string according to the signature of the SecretKey, and the server uses the SecretKey to perform signature verification.

4.2 Privilege check

The check logic of the right side of the broker is mainly divided into the following steps:

(1) Check if the global IP whitelist is hit; if yes, the check passes; otherwise, go to step (2);

(2) Check if the user IP whitelist is hit; if yes, the check passes; otherwise, go to step (3);

(3) Check the signature, if the verification fails, throw an exception; if the verification passes, go to step (4);

(4) Check the permissions required by the user request and the permissions owned by the user; if not, throw an exception;

The verification of the required permissions of the user requires attention to the following points:

(1) Special requests such as UPDATE_AND_CREATE_TOPIC can only be operated by the admin account;

(2) For a resource, if there is explicit configuration permission, the configured permission is used; if there is no explicit configuration permission, the default permission is adopted;

5. Hot loading modified Access control

The default implementation of RocketrMQ's permission control store is based on the yml configuration file. Users can dynamically modify the properties defined by the permission control without restarting the Broker service node.