0%

Client Configuration

Relative to RocketMQ's Broker cluster, producers and consumers are client. In this section, it mainly describes the common behavior configuration of producers and consumers.

1 Client Addressing mode

RocketMQ can let client find the Name Server, and then find the Brokerby the Name Server. Followings show a variety of configurations, and priority level from highly to lower, the highly priority configurations can override the lower priority configurations.

  • Specified Name Server address in the code, and multiple Name Server addresses are separated by semicolons
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");

consumer.setNamesrvAddr(“192.168.0.1:9876;192.168.0.2:9876”);

  • Specified Name Server address in the Java setup parameters
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876  
  • Specified Name Server address in the envionment variables
export   NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876   
  • HTTP static server addressing(default)

After client started, it will access the http static server address, as: http://jmenv.tbsite.net:8080/rocketmq/nsaddr, this URL return the following contents:

192.168.0.1:9876;192.168.0.2:9876   

By default, the client accesses the HTTP server every 2 minutes, and update the local Name Server address.The URL is hardcoded in the code, you can change the target server by updating /etc/hosts file, such as add following configuration at the /etc/hosts:

10.232.22.67    jmenv.taobao.net   

HTTP static server addressing is recommended, because it is simple client deployment, and the Name Server cluster can be upgraded hot.

2 Client Configuration

DefaultMQProducer,TransactionMQProducer,DefaultMQPushConsumer,DefaultMQPullConsumer all extends the ClientConfig Class, ClientConfig as the client common configuration class. Client configuration style like getXXX,setXXX, each of the parameters can config by spring and also config their in the code. Such as the namesrvAddr parameter: producer.setNamesrvAddr("192.168.0.1:9876"), same with the other parameters.

2.1 Client Common Configuration

Pamater NameDefault ValueDescription
namesrvAddrName Server address list, multiple NameServer addresses are separated by semicolons
clientIPlocal IPClient local ip address, some machines will fail to recognize the client IP address, which needs to be enforced in the code
instanceNameDEFAULTName of the client instance, Multiple producers and consumers created by the client actually share one internal instance (this instance contains network connection, thread resources, etc.).
clientCallbackExecutorThreads4Number of communication layer asynchronous callback threads
pollNameServerInteval30000Polling the Name Server interval in milliseconds
heartbeatBrokerInterval30000The heartbeat interval, in milliseconds, is sent to the Broker
persistConsumerOffsetInterval5000The persistent Consumer consumes the progress interval in milliseconds

2.2 Producer Configuration

Pamater NameDefault ValueDescription
producerGroupDEFAULT_PRODUCERThe name of the Producer group. If multiple producers belong to one application and send the same message, they should be grouped into the same group
createTopicKeyTBW102When a message is sent, topics that do not exist on the server are automatically created and a Key is specified that can be used to configure the default route to the topic where the message is sent.
defaultTopicQueueNums4The number of default queue when sending messages and auto created topic which not exists the server
sendMsgTimeout10000Timeout time of sending message in milliseconds
compressMsgBodyOverHowmuch4096The message Body begins to compress beyond the size(the Consumer gets the message automatically unzipped.), unit of byte
retryAnotherBrokerWhenNotStoreOKFALSEIf send message and return sendResult but sendStatus!=SEND_OK, Whether to resend
retryTimesWhenSendFailed2If send message failed, maximum number of retries, this parameter only works for synchronous send mode
maxMessageSize4MBClient limit message size, over it may error. Server also limit so need to work with server
transactionCheckListenerThe transaction message looks back to the listener, if you want send transaction message, you must setup this
checkThreadPoolMinSize1Minimum of thread in thread pool when Broker look back Producer transaction status
checkThreadPoolMaxSize1Maximum of thread in thread pool when Broker look back Producer transaction status
checkRequestHoldMax2000Producer local buffer request queue size when Broker look back Producer transaction status
RPCHooknullThis parameter is passed in when the Producer is creating, including the pre-processing before the message sending and the processing after the message response. The user can do some security control or other operations in the first interface.

2.3 PushConsumer Configuration

Pamater NameDefault ValueDescription
consumerGroupDEFAULT_CONSUMERConsumer group name. If multi Consumer belong to an application, subscribe the same message and consume logic as the same, they should be gathered together
messageModelCLUSTERINGMessage support two mode: cluster consumption and broadcast consumption
consumeFromWhereCONSUME_FROM_LAST_OFFSETAfter Consumer started, default consumption from last location, it include two situation: One is last consumption location is not expired, and consumption start at last location; The other is last location expired, start consumption at current queue's first message
consumeTimestampHalf an hour agoOnly consumeFromWhere=CONSUME_FROM_TIMESTAMP, this can work
allocateMessageQueueStrategyAllocateMessageQueueAveragelyImplements strategy of Rebalance algorithms
subscriptionsubscription relation
messageListenermessage listener
offsetStoreConsumption progress store
consumeThreadMin10Minimum of thread in consumption thread pool
consumeThreadMax20Maximum of thread in consumption thread pool
consumeConcurrentlyMaxSpan2000Maximum span allowed for single queue parallel consumption
pullThresholdForQueue1000Pull message local queue cache maximum number of messages
pullInterval0Pull message interval, because long polling it is 0, but for flow control, you can set value which greater than 0 in milliseconds
consumeMessageBatchMaxSize1Batch consume message
pullBatchSize32Batch pull message

2.4 PullConsumer Configuration

Pamater NameDefault ValueDescription
consumerGroupDEFAULT_CONSUMERConsumer group name. If multi Consumer belong to an application, subscribe the same message and consume logic as the same, they should be gathered together
brokerSuspendMaxTimeMillis20000Long polling, Consumer pull message request suspended for the longest time in the Broker in milliseconds
consumerTimeoutMillisWhenSuspend30000Long polling, Consumer pull message request suspend in the Broker over this time value, client think timeout. Unit is milliseconds
consumerPullTimeoutMillis10000Not long polling, timeout time of pull message in milliseconds
messageModelBROADCASTINGMessage support two mode: cluster consumption and broadcast consumption
messageQueueListenerListening changing of queue
offsetStoreConsumption schedule store
registerTopicsCollection of registered topics
allocateMessageQueueStrategyAllocateMessageQueueAveragelyImplements strategy about Rebalance algorithm

2.5 Message Data Structure

Field NameDefault ValueDescription
TopicnullRequired, the name of the topic to which the message belongs
BodynullRequired, message body
TagsnullOptional, message tag, convenient for server filtering. Currently only one tag per message is supported
KeysnullOptional, represent this message's business keys, server create hash indexes based keys. After setting, you can find message by Topics,Keys in Console system. Because of hash indexes, please make key as unique as possible, such as order number, goods Id and so on.
Flag0Optional, it is entirely up to the application, and RocketMQ does not intervene
DelayTimeLevel0Optional, message delay level, 0 represent no delay, greater tan 0 can consume
WaitStoreMsgOKTRUEOptional, indicates whether the message is not answered until the server is down.

Cluster Setup

1 Single Master mode

This is the simplest, but also the riskiest mode, that makes the entire service unavailable once the broker restarts or goes down. Production environments are not recommended, but can be used for local testing and development. Here are the steps to build.

1)Start NameServer

### Start Name Server first
$ nohup sh mqnamesrv &
 
### Then verify that the Name Server starts successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

We can see 'The Name Server boot success.. ' in namesrv.log that indicates the NameServer has been started successfully.

2)Start Broker

### Also start broker first
$ nohup sh bin/mqbroker -n localhost:9876 &

Then verify that the broker is started successfully, for example, the IP of broker is 192.168.1.2 and the name is broker-a

$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a,192.169.1.2:10911] boot success…

We can see 'The broker[brokerName,ip:port] boot success..' in Broker.log that indicates the broker has been started successfully.

2 Multiple Master mode

Multiple master mode means a mode with all master nodes(such as 2 or 3 master nodes) and no slave node. The advantages and disadvantages of this mode are as follows:

  • Advantages:
    1. Simple configuration.
    2. Outage or restart(for maintenance) of one master node has no impact on the application.
    3. When the disk is configured as RAID10, messages are not lost because the RAID10 disk is very reliable, even if the machine is not recoverable (In the case of asynchronous flush disk mode of the message, a small number of messages are lost; If the brush mode of a message is synchronous, no message will be lost).
    4. In this mode, the performance is the highest.
  • Disadvantages:
    1. During a single machine outage, messages that are not consumed on this machine are not subscribed to until the machine recovers, and message real-time is affected.

The starting steps for multiple master mode are as follows:

1)Start NameServer

### Start Name Server first
$ nohup sh mqnamesrv &
 
### Then verify that the Name Server starts successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)Start the Broker cluster

### For example, starting the first Master on machine A, assuming that the configured NameServer IP is: 192.168.1.1.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
 
### Then starting the second Master on machine B, assuming that the configured NameServer IP is: 192.168.1.1.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &


The boot command shown above is used in the case of a single NameServer.For clusters of multiple NameServer, the address list after the -n argument in the broker boot command is separated by semicolons, for example, 192.168.1.1: 9876;192.161.2: 9876.

3 Multiple Master And Multiple Slave Mode-Asynchronous replication

Each master node configures more thran one slave nodes, with multiple pairs of master-slave.HA uses asynchronous replication, with a short message delay (millisecond) between master node and slave node.The advantages and disadvantages of this mode are as follows:

  • Advantages:
    1. Even if the disk is corrupted, very few messages will be lost and the real-time performance of the message will not be affected.
    2. At the same time, when master node is down, consumers can still consume messages from slave node, and the process is transparent to the application itself and does not require human intervention.
    3. Performance is almost as high as multiple master mode.
  • Disadvantages:
    1. A small number of messages will be lost when master node is down and the disk is corrupted.

The starting steps for multiple master and multiple slave mode are as follows:

1)Start NameServer

### Start Name Server first
$ nohup sh mqnamesrv &
 
### Then verify that the Name Server starts successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)Start the Broker cluster

### For example, starting the first Master on machine A, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
 
### Then starting the second Master on machine B, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
 
### Then starting the first Slave on machine C, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
 
### Last starting the second Slave on machine D, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

The above shows a startup command for 2M-2S-Async mode, similar to other nM-nS-Async modes.

4 Multiple Master And Multiple Slave Mode-Synchronous dual write

In this mode, multiple slave node are configured for each master node and there are multiple pairs of Master-Slave.HA uses synchronous double-write, that is, the success response will be returned to the application only when the message is successfully written into the master node and replicated to more than one slave node.

The advantages and disadvantages of this model are as follows:

  • Advantages:
    1. Neither the data nor the service has a single point of failure.
    2. In the case of master node shutdown, the message is also undelayed.
    3. Service availability and data availability are very high;
  • Disadvantages:
    1. The performance in this mode is slightly lower than in asynchronous replication mode (about 10% lower).
    2. The RT sending a single message is slightly higher, and the current version, the slave node cannot automatically switch to the master after the master node is down.

The starting steps are as follows:

1)Start NameServer

### Start Name Server first
$ nohup sh mqnamesrv &
 
### Then verify that the Name Server starts successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)Start the Broker cluster

### For example, starting the first Master on machine A, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
 
### Then starting the second Master on machine B, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
 
### Then starting the first Slave on machine C, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
 
### Last starting the second Slave on machine D, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

The above Master and Slave are paired by specifying the same config named "brokerName", the "brokerId" of the master node must be 0, and the "brokerId" of the slave node must be greater than 0.

Load Balancing

Load balancing in RocketMQ is accomplished on Client side. Specifically, it can be divided into load balancing at Producer side when sending messages and load balancing at Constumer side when subscribing messages.

Producer Load Balancing

When the Producer sends a message, it will first find the specified TopicPublishInfo according to Topic. After getting the routing information of TopicPublishInfo, the RocketMQ client will select a queue (MessageQueue) from the messageQueue List in TopicPublishInfo to send the message by default.Specific fault-tolerant strategies are defined in the MQFaultStrategy class.

Here is a sendLatencyFaultEnable switch variable, which, if turned on, filters out the Broker agent of not available on the basis of randomly gradually increasing modular arithmetic selection. The so-called “latencyFault Tolerance” refers to a certain period of time to avoid previous failures. For example, if the latency of the last request exceeds 550 Lms, it will evade 3000 Lms; if it exceeds 1000L, it will evade 60000 L; if it is closed, it will choose a queue (MessageQueue) to send messages by randomly gradually increasing modular arithmetic, and the latencyFault Tolerance mechanism is the key to achieve high availability of message sending.

Consumer Load Balancing

In RocketMQ, the two consumption modes (Push/Pull) on the Consumer side are both based on the pull mode to get the message, while in the Push mode it is only a kind of encapsulation of the pull mode, which is essentially implemented as the message pulling thread after pulling a batch of messages from the server. After submitting to the message consuming thread pool, it continues to try again to pull the message to the server. If the message is not pulled, the pull is delayed and continues. In both pull mode based consumption patterns (Push/Pull), the Consumer needs to know which message queue - queue from the Broker side to get the message. Therefore, it is necessary to do load balancing on the Consumer side, that is, which Consumer consumption is allocated to the same ConsumerGroup by more than one MessageQueue on the Broker side.

1 Heartbeat Packet Sending on Consumer side

After Consumer is started, it continuously sends heartbeat packets to all Broker instances in the RocketMQ cluster via timing task (which contains the message consumption group name, subscription relationship collection,Message communication mode and the value of the client id,etc). After receiving the heartbeat message from Consumer, Broker side maintains it in Consumer Manager's local caching variable—consumerTable, At the same time, the encapsulated client network channel information is stored in the local caching variable—channelInfoTable, which can provide metadata information for the later load balancing of Consumer.

2 Core Class for Load Balancing on Consumer side—RebalanceImpl

Starting the MQClientInstance instance in the startup process of the Consumer instance will complete the start of the load balancing service thread-RebalanceService (executed every 20 s). By looking at the source code, we can find that the run () method of the RebalanceService thread calls the rebalanceByTopic () method of the RebalanceImpl class, which is the core of the Consumer end load balancing. Here, rebalanceByTopic () method will do different logical processing depending on whether the consumer communication type is "broadcast mode" or "cluster mode". Here we mainly look at the main processing flow in cluster mode:

1) Get the message consumption queue set (mqSet) under the Topic from the local cache variable—topicSubscribeInfoTable of the rebalanceImpl instance.
2) Call mQClientFactory. findConsumerIdList () method to send a RPC communication request to Broker side to obtain the consumer Id list under the consumer group based on the parameters of topic and consumer group (consumer table constructed by Broker side based on the heartbeat data reported by the front consumer side responds and returns, business request code: GET_CONSUMER_LIST_BY_GROUP);
3) First, the message consumption queue and the consumer Id under Topic are sorted, then the message queue to be pulled is calculated by using the message queue allocation strategy algorithm (default: the average allocation algorithm of the message queue). The average allocation algorithm here is similar to the paging algorithm. It ranks all MessageQueues like records. It ranks all consumers like pages. It calculates the average size of each page and the range of each page record. Finally, it traverses the whole range and calculates the records that the current consumer should allocate to (MessageQueue here).

Image text

4) Then, the updateProcessQueueTableInRebalance () method is invoked, which first compares the allocated message queue set (mqSet) with processQueueTable for filtering.

Image text

  • The red part of the processQueueTable annotation in the figure above

    indicates that it is not included with the assigned message queue set

    mqSet. Set the Dropped attribute to true for these queues, and then

    check whether these queues can remove the processQueueTable cache

    variable or not. The removeUnnecessaryMessageQueue () method is

    executed here, that is, check every 1s to see if the locks of the

    current consumption processing queue can be retrieved and return true

    if they are retrieved. If the lock of the current consumer processing

    queue is still not available after waiting for 1s, it returns false.

    If true is returned, the corresponding Entry is removed from the

    processQueueTable cache variable.

  • The green section in processQueueTable above represents the

    intersection with the assigned message queue set mqSet. Determine

    whether the ProcessQueue has expired, regardless of Pull mode, if it

    is Push mode, set the Dropped attribute to true, and call the

    removeUnnecessaryMessageQueue () method to try to remove Entry as

    above;

Finally, a ProcessQueue object is created for each MessageQueue in the filtered message queue set (mqSet) and stored in the processQueueTable queue of RebalanceImpl (where the computePullFromWhere (MessageQueue mq) method of the RebalanceImpl instance is invoked to obtain the next progress consumption value offset of the MessageQueue object, which is then populated into the attribute of pullRequest object to be created next time.), and create pull request object—pullRequest to add to pull list—pullRequestList, and finally execute dispatchPullRequest () method. PullRequest object of Pull message is put into the blocking queue pullRequestQueue of PullMessageService service thread in turn, and the request of Pull message is sent to Broker end after the service thread takes out.

The core design idea of message consumption queue is that a message consumption queue can only be consumed by one consumer in the same consumer group at the same time, and a message consumer can consume multiple message queues at the same time.

RocketMQ - a distributed message queue, is different with all other MQ middleware, on the way of filtering messages. It's do the filter when the messages are subscribed via consumer side.RocketMQ do it lies in the separate storage mechanism that Producer side writing messages and Consomer subscribe messages, Consumer side will get an index from a logical message queue ConsumeQueue when subscribing, then read message entity from CommitLog using the index. So in the end, it is still impossible to get around its storage structure.The storage structure of ConsumeQueue is as follows, and there is a 8-byte Message Tag hashcode, The message filter based on Tag value is just used this Message Tag hash-code.

The RocketMQ has two mainly filter types:

  • Tag filtering: Consumer can specify not only the message topic but also the message tag values, when subscribing. Multiple tag values need to be separated by '||'. When consumer subscribing a message, it builds the subscription request into a SubscriptionData object and sends a pull message request to the Broker side. Before the Broker side reads data from the RocketMQ file storage layer - Store, it will construct a MessageFilter using the SubscriptionData object and then pass it to the Store. Store get a record from ConsumeQueue, and it will filter the message by the saved tag hashcode, it is unable to filter the messages exactly in the server side because of only the hashcode will be used when filtering, Therefore, after the Consumer pulls the message, it also needs to compare the original tag string of the message. If the original tag string is not same with the expected, the message will be ignored.

  • SQL92 filtering: This filter behavior is almost same with the above Tag filtering method. The only difference is on the way how Store works. The rocketmq-filter module is responsible for the construction and execution of the real SQL expression. Executing an SQL expression every time a filter is executed affects efficiency, so RocketMQ uses BloomFilter to avoid doing it every time. The expression context of SQL92 is a property of the message.

This section focuses on the configuration of the system (JVM/OS)

1 JVM Options

The latest released version of JDK 1.8 is recommended. Set the same Xms and Xmx value to prevent the JVM from resizing the heap for better performance. A simple JVM configuration is as follows:

-server -Xms8g -Xmx8g -Xmn4g

Direct ByteBuffer memory size setting. Full GC will be triggered when the Direct ByteBuffer up to the specified size:

-XX:MaxDirectMemorySize=15g

If you don’t care about the boot time of RocketMQ broker, pre-touch the Java heap to make sure that every page will be allocated during JVM initialization is a better choice. Those who don’t care about the boot time can enable it:

-XX:+AlwaysPreTouch

Disable biased locking maybe reduce JVM pauses:

-XX:-UseBiasedLocking

As for garbage collection, G1 collector with JDK 1.8 is recommended:

-XX:+UseG1GC -XX:G1HeapRegionSize=16m 
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30

These GC options looks a little aggressive, but it’s proved to have good performance in our production environment

Don’t set a too small value for -XX:MaxGCPauseMillis, otherwise JVM will use a small young generation to achieve this goal which will cause very frequent minor GC.So use rolling GC log file is recommended:

-XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=5 
-XX:GCLogFileSize=30m

If write GC file will increase latency of broker, consider redirect GC log file to a memory file system:

-Xloggc:/dev/shm/mq_gc_%p.log123

2 Linux Kernel Parameters

There is a os.sh script that lists a lot of kernel parameters in folder bin which can be used for production use with minor changes. Below parameters need attention, and more details please refer to documentation for /proc/sys/vm/*.

  • vm.extra_free_kbytes, tells the VM to keep extra free memory between the threshold where background reclaim (kswapd) kicks in, and the threshold where direct reclaim (by allocating processes) kicks in. RocketMQ uses this parameter to avoid high latency in memory allocation. (It is specific to the kernel version)

  • vm.min_free_kbytes, if you set this to lower than 1024KB, your system will become subtly broken, and prone to deadlock under high loads.

  • vm.max_map_count, limits the maximum number of memory map areas a process may have. RocketMQ will use mmap to load CommitLog and ConsumeQueue, so set a bigger value for this parameter is recommended.

  • vm.swappiness, define how aggressive the kernel will swap memory pages. Higher values will increase agressiveness, lower values decrease the amount of swap. 10 is recommended for this value to avoid swap latency.

  • File descriptor limits, RocketMQ needs open file descriptors for files(CommitLog and ConsumeQueue) and network connections. We recommend setting 655350 for file descriptors.

  • Disk scheduler, the deadline I/O scheduler is recommended for RocketMQ, which attempts to provide a guaranteed latency for requests.

1 Message Model

RocketMQ message model is mainly composed of Producer, Broker and Consumer. The producer is responsible for producing messages and the consumer is for consuming messages, while the broker stores messages.

The broker is an independent server during actual deployment, and each broker can store messages from multiple topics. Even messages from the same topic can be stored in the different brokers by sharding strategy.

The message queue is used to store physical offsets of messages, and the message addresses are stored in seperate queues. The consumer group consists of multiple consumer instances.

2 Producer

The Producer is responsible for producing messages, typically by business systems. It sends messages generated by the systems to brokers. RocketMQ provides multiple paradigms of sending: synchronous, asynchronous, sequential and one-way. Both synchronous and asynchronous methods require the confirmation information return from the Broker, but one-way method does not require it.

3 Consumer

The Consumer is responsible for consuming messages, typically the background system is responsible for asynchronous consumption. The consumer pulls messages from brokers and feeds them into application. From the perspective of user, two types of consumers are provided: pull consumer and push consumer.

4 Topic

The Topic refers to a collection of one kind of message. Each topic contains several messages and one message can only belong to one topic. The topic is the basic unit of RocketMQ for message subscription.

5 Broker Server

As the role of the transfer station, the Broker Server stores and forwards messages. In RocketMQ, the broker server is responsible for receiving messages sent from producers, storing them and preparing to handle pull requests. It also stores the related message meta data, including consumer groups, consuming progress, topics, queues info and so on.

6 Name Server

The Name Server serves as the provider of routing service. The producer or the consumer can find the list of broker IP addresses for each topic through name server. Multiple name servers can be deployed in one cluster, but they are independent of each other and do not exchange information.

7 Pull Consumer

A type of Consumer, the application pulls messages from brokers by actively invoking the consumer pull message method, and the application has the advantages of controlling the timing and frequency of pulling messages. Once the batch of messages is pulled, user application will initiate consuming process.

8 Push Consumer

A type of Consumer, Under this high real-time performance mode, it will push the message to the consumer actively when the Broker receives the data.

9 Producer Group

A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the broker server will contact other producers in the same producer group to commit or rollback the transactional message.

10 Consumer Group

A collection of the same type of Consumer, which consume the same type of messages with consistent logic. The consumer group makes load-balance and fault-tolerance super easy in terms of message consuming.

Warning: consumer instances of one consumer group must have exactly the same topic subscription(s).

RocketMQ supports two types of consumption mode:Clustering and Broadcasting.

11 Consumption Mode - Clustering

Under the Clustering mode, all the messages from one topic will be delivered to all the consumers instances averagely as much as possible. That is, one message can be consumed by only one consumer instance.

12 Consumption Mode - Broadcasting

Under the Broadcasting mode, each consumer instance of the same consumer group receives every message published to the corresponding topic.

13 Normal Ordered Message

Under the Normal Ordered Message mode, the messages received by consumers from the same ConsumeQueue are sequential, but the messages received from the different message queues may be non-sequential.

14 Strictly Ordered Message

Under the Strictly Ordered Message mode, all messages received by the consumers from the same topic are sequential as the order they are stored.

15 Message

The physical carrier of information transmitted by a messaging system, the smallest unit of production and consumption data, each message must belong to one topic.

Each Message in RocketMQ has a unique message id and can carry a key used to store business-related value. The system has the function to query messages by its id or key.

16 Tag

Flags set for messages to distinguish different types of messages under the same topic, functioning as a "sub-topic". Messages from the same business unit can set different tags under the same topic in terms of different business purposes. The tag can effectively maintain the clarity and consistency of the code and optimize the query system provided by RocketMQ. The consumer can realize different "sub-topic" by using tag in order to achieve better expansibility.

Before introducing the mqadmin management tool, the following points need to be declared:

  • The way of executing a command is:./mqadmin {command} {args}
  • Almost all commands need to attach the -n option to represent the nameServer address, formatted as ip:port;
  • Almost all commands can get help information with the -h option;
  • If the broker address -b option and clusterName -c option are both configured with specific values, the command execution will select the broker address specified by -b option. The value of the -b option can only be configured with a single address. The format is ip:port. The default port value is 10911. If the value of the -b option is not configured, the command will be applied to all brokers in the entire cluster.
  • You can see many commands under tools, but not all commands can be used, only the commands initialized in MQAdminStartup can be used, you can also modify this class, add or customize commands;
  • Due to the issue of version update, a small number of commands may not be updated in time, please read the related command source code to eliminate and resolve the error.

1 Topic related command instructions

NameMeaningCommand optionExplain
updateTopicCreate or update the configuration of topic-bThe -b option declares the specific address of the broker, indicating that the broker, in which the topic is located supports only a single broker and the address format is ip:port.
-cThe -c option declares the name of the cluster, which represents the cluster in which the current topic is located. (clusters are available through clusterList query)
-h-Print help information
-nDeclare the service address of the nameServer, and the option format is ip:port
-pThe -p option is used to specify the read and write permission for the new topic (W=2 | R=4 | WR=6)
-rThe -r option declares the number of readable queues (default 8)
-wThe -w option declares the number of writable queues (default 8)
-tThe -t option declares the name of the topic (the name can only use characters^ [a-zA-Z0-9s -] + $)
deleteTopicDelete the topic command-cThe -c option specifies the name of the cluster, which means that one of the topic in the specified cluster is deleted (cluster names can be queried via clusterList)
-hPrint help information
-nDeclare the service address of the nameServer, and the option format is ip:port
-tThe -t option declares the name of the topic (the name can only use characters^ [a-zA-Z0-9s -] + $)
topicListView topic list information-hPrint help information
-cIf the -c option is not configured, only the topic list is returned, and the addition of -c option returns additional information about the clusterName, topic, consumerGroup, that is, the cluster and subscription to which the topic belongs, and no other option need to be configured.
-nDeclare the service address of the nameServer, and the option format is ip:port
topicRouteTo view topic specific routing information-tUsed to specify the name of the topic
-hPrint help information
-nDeclare the service address of the nameServer, and the option format is ip:port
topicStatusThe location of the offset used to view the topic message queue-tUsed to specify the name of the topic
-hPrint help information
-nDeclare the service address of the nameServer, and the option format is ip:port
topicClusterListTo view the list of clusters to which topic belongs-tUsed to specify the name of the topic
-hPrint help information
-nDeclare the service address of the nameServer, and the option format is ip:port
updateTopicPermThis command is used to update read and write permissions for topic-tUsed to specify the name of the topic
-hPrint help information
-nDeclare the service address of the nameServer, and the option format is ip:port
-bThe -b option declares the specific address of the broker, indicating that the broker, in which the topic is located supports only a single broker and the address format is ip:port.
-pThe -p option is used to specify the read and write permission for the new topic (W=2 | R=4 | WR=6)
-cUsed to specify the name of the cluster that represents the cluster in which the topic is located, which can be accessed through the clusterList query, but the -b parameter has a higher priority, and if no -b option related configuration is specified, the command is executed on all broker in the cluster
updateOrderConfThe key, value configuration that creates, deletes, and retrieves specific namespaces from nameServer is not yet enabled.-hPrint help information
-nDeclare the service address of the nameServer, and the option format is ip:port
-ttopic,key
-vorderConf,value
-mmethod,available values include get, put, delete
allocateMQComputing load result of load message queue in consumer list with average load algorithm-tUsed to specify the name of the topic
-hPrint help information
-nDeclare the service address of the nameServer, and the option format is ip:port
-iIpList, is separated by commas to calculate which message queues these ip unload topic
statsAllFor printing topic subscription, TPS, cumulative amount, 24 hours read and write total, etc.-hPrint help information
-nDeclare the service address of the nameServer, and the option format is ip:port
-aWhether to print only active topic
-tUsed to specify the name of the topic

2 Cluster related command instructions

NameMeaningCommand optionExplain
clusterListView cluster information, cluster, brokerName, brokerId, TPS, and so on-mPrint more information (add print to # InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)
-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
-iPrint interval,unit basis is seconds
clusterRTSend message to detect each broker RT of the cluster.the message send to ${BrokerName} Topic-aamount,total number per probe,RT = Total time/amount
-sMessage size,unit basis is B
-cWhich cluster to detect.
-pWhether to print the formatted log,split with "|", not printed by default
-hPrint help information
-mOwned computer room for printing
-iThe interval, in seconds, at which a message is sent.
-nService address used to specify nameServer and formatted as ip:port

3 Broker related command instructions

NameMeaningCommand optionExplain
updateBrokerConfigThe configuration information used to update the broker and the contents of the Broker.conf file are modified-bDeclare the address of the broker and format as ip:port
-cSpecify the name of the cluster
-kthe value of k
-vthe value of value
-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
brokerStatusFor viewing broker related statistics and running status (almost all the information you want is inside)-bDeclare the address of the broker and format as ip:port
-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
brokerConsumeStatsGet the consumption of each consumer in broker and return information such as consume Offset,broker Offset,diff,timestamp by message queue dimension-bDeclare the address of the broker and format as ip:port
-tConfigure the timeout of the request
-lConfigure the diff threshold beyond which to print
-oSpecifies whether the order topic, is typically false
-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
getBrokerConfigGet configuration information for the broker-bDeclare the address of the broker and format as ip:port
-nService address used to specify nameServer and formatted as ip:port
wipeWritePermClear write permissions for broker from nameServer-bDeclare the BrokerName
-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
cleanExpiredCQClean up expired consume Queue on broker,An expired queue may be generated if the number of columns is reduced manually-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
-bDeclare the address of the broker and format as ip:port
-cUsed to specify the name of the cluster
cleanUnusedTopicClean up unused topic on broker and release topic's consume Queue from memory,If the topic is removed manually, an unused topic will be generated-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
-bDeclare the address of the broker and format as ip:port
-cUsed to specify the name of the cluster
sendMsgStatusSend a message to the broker and then return the send status and RT-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
-bbrokerName,note that this is not broker's address
-sMessage size,the unit of account is B
-cNumber of messages sent

4 Message related command instructions

NameMeaningCommand optionExplain
queryMsgByIdQuery msg according to offsetMsgId. If you use open source console, you should use offsetMsgId. There are other parameters for this command. For details, please read QueryMsgByIdSubCommand.-imsgId
-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
queryMsgByKeyQuery messages based on message Key-kmsgKey
-tThe name of the topic
-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
queryMsgByOffsetQuery messages based on Offset-bThe name of broker,(Note here: the name of broker is filled in, not the address of broker, and the broker name can be found in clusterList)
-iQueue id of the query
-oThe value of offset
-tThe name of the topic
-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
queryMsgByUniqueKeyAccording to the msgId query, msgId is different from offsetMsgId. The specific differences can be found in common operational and maintenance problems. "-g" option and "-d" option are to be used together, and when you find the message, try to get a particular consumer to consume the message and return the result of the consumption.-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
-iuniqe msg id
-gconsumerGroup
-dclientId
-tThe name of the topic
checkMsgSendRTDetect RT to send a message to topic, function similar to clusterRT-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
-tThe name of the topic
-athe number of probes
-sThe size of message
sendMessageSend a message that can be sent, as configured, to a particular message Queue, or to a normal send.-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
-tThe name of the topic
-pbody,message body
-kkeys
-ctags
-bbrokerName
-iqueueId
consumeMessageConsumer messages. You can consume messages based on offset, start timestamps, end timestamps, message queues, and configure different consumption logic for different execution, as detailed in ConsumeMessageCommand.-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
-tThe name of the topic
-bbrokerName
-oStart consumption from offset
-iqueueId
-gGroup of consumers
-sSpecify a start timestamp in a format see -h
-dSpecify a end timestamp
-cSpecify how many messages to consume
printMsgConsume messages from broker and print them, optional time periods-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
-tThe name of the topic
-cCharacter set,for example UTF-8
-ssubExpress,filter expression
-bSpecify a start timestamp in a format see -h
-eSpecify the end timestamp
-dWhether to print the message body
printMsgByQueueSimilar to printMsg, but specifying message queue-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
-tThe name of the topic
-iqueueId
-abrokerName
-cCharacter set,for example UTF-8
-ssubExpress,filter expression
-bSpecify a start timestamp in a format see -h
-eSpecify the end timestamp
-pWhether to print a message
-dWhether to print the message body
-fWhether to count the number of tags and print
resetOffsetByTimeReset both offset,broker and consumer by timestamp-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
-gGroup of consumers
-tThe name of the topic
-sResets the offset corresponding to this timestamp
-fWhether to force a reset, if set to false, only supports backtracking offset, if it is true, regardless of the relationship between offset and consume Offset with the timestamp
-cWhether to reset the C++ client offset

5 Consumer and Consumer Group related command instructions

NameMeaningCommand optionExplain
consumerProgressTo view the subscriber consumption status, you can see the amount of message accumulation for a specific client IP-gThe group name of consumer
-sWhether to print client IP
-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
consumerStatusSee the consumer status, including whether the same subscription is in the same group, analyze whether the process queue is stacked, return the consumer jstack results, more content, and see ConsumerStatusSubCommand for the user-hPrint help information
-nService address used to specify nameServer and formatted as ip:port
-gconsumer group
-iclientId
-sWhether to execute jstack
getConsumerStatusGet Consumer consumption progress-gthe group name of consumer
-tQuery topic
-iIp address of consumer client
-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
updateSubGroupUpdate or create a subscription-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
-bthe address of broker
-cThe name of cluster
-gThe group name of consumer
-sWhether the group is allowed to consume
-mWhether to start consumption from the minimum offset
-dIs it a broadcast mode
-qThe Number of retry queues
-rMaximum number of retries
-iWhen the slaveReadEnable is on and which brokerId consumption is recommended for consumption from slave, the brokerid of slave, can be configured to consume from the slave actively
-wIf broker recommends consumption from slave, configuration determines which slave consumption to consume from, and configure a specific brokerId, such as 1
-aWhether to notify other consumers of load balancing when the number of consumers changes
deleteSubGroupRemove subscriptions from broker-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
-bthe address of broker
-cThe name of cluster
-gThe group name of consumer
cloneGroupOffsetUse the offset of the source group in the target group-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
-sSource consumer group
-dTarget consumer group
-tThe name of topic
-oNot used yet

6 Connection related command instructions

NameMeaningCommand optionExplain
consumerConnectionQuery the network connection of consumer-gThe group name of consumer
-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
producerConnectionQuery the network connection of producer-gthe group name of producer
-tThe name of topic
-nService address used to specify nameServer and formatted as ip:port
-hPrint help information

7 NameServer related command instructions

NameMeaningCommand optionExplain
updateKvConfigUpdate the kv configuration of nameServer, which is not currently used-sSpecify a specific namespace
-kkey
-vvalue
-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
deleteKvConfigDelete the kv configuration of nameServer-sSpecify a specific namespace
-kkey
-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
getNamesrvConfigGet the configuration of the nameServer-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
updateNamesrvConfigModifying the configuration of nameServer-nService address used to specify nameServer and formatted as ip:port
-hPrint help information
-kThe value of key
-vThe value of value

8 Other relevant command notes

NameMeaningCommand optionExplain
startMonitoringUsed to start the monitoring process, monitor message deletion, retry queue messages, etc.-nService address used to specify nameServer and formatted as ip:port
-hPrint help information


1 使用场景

随着服务规模的扩大,单机服务无法满足性能和容量的要求,此时需要将服务拆分为更小粒度的服务或者部署多个服务实例构成集群来提供服务。在分布式场景下,RPC是最常用的联机调用的方式。

在构建分布式应用时,有些领域,例如金融服务领域,常常使用消息队列来构建服务总线,实现联机调用的目的。消息队列的主要场景是解耦、削峰填谷,在联机调用的场景下,需要将服务的调用抽象成基于消息的交互,并增强同步调用的这种交互逻辑。为了更好地支持消息队列在联机调用场景下的应用,rocketmq-4.7.0推出了“Request-Reply”特性来支持RPC调用。

2 设计思路

在rocketmq中,整个同步调用主要包括两个过程:

(1)请求方生成消息,发送给响应方,并等待响应方回包;

(2)响应方收到请求消息后,消费这条消息,并发出一条响应消息给请求方。

整个过程实质上是两个消息收发过程的组合。所以这里最关键的问题是如何将异步的消息收发过程构建成一个同步的过程。其中主要有两个问题需要解决:

2.1 请求方如何同步等待回包

这个问题的解决方案中,一个关键的数据结构是RequestResponseFuture。

public class RequestResponseFuture {
    private final String correlationId;
    private final RequestCallback requestCallback;
    private final long beginTimestamp = System.currentTimeMillis();
    private final Message requestMsg = null;
    private long timeoutMillis;
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    private volatile Message responseMsg = null;
    private volatile boolean sendRequestOk = true;
    private volatile Throwable cause = null;
}

RequestResponseFuture中,利用correlationId来标识一个请求。如下图所示,Producer发送request时创建一个RequestResponseFuture,以correlationId为key,RequestResponseFuture为value存入map,同时请求中带上RequestResponseFuture中的correlationId,收到回包后根据correlationId拿到对应的RequestResponseFuture,并设置回包内容。

2.2 consumer消费消息后,如何准确回包

(1)producer在发送消息的时候,会给每条消息生成唯一的标识符,同时还带上了producer的clientId。当consumer收到并消费消息后,从消息中取出消息的标识符correlationId和producer的标识符clientId,放入响应消息,用来确定此响应消息是哪条请求消息的回包,以及此响应消息应该发给哪个producer。同时响应消息中设置了消息的类型以及响应消息的topic,然后consumer将消息发给broker,如下图所示。

(2)broker收到响应消息后,需要将消息发回给指定的producer。Broker如何知道发回给哪个producer?因为消息中包含了producer的标识符clientId,在ProducerManager中,维护了标识符和channel信息的对应关系,通过这个对应关系,就能把回包发给对应的producer。

响应消息发送和一般的消息发送流程区别在于,响应消息不需要producer拉取,而是由broker直接推给producer。同时选择broker的策略也有变化:请求消息从哪个broker发过来,响应消息也发到对应的broker上。

Producer收到响应消息后,根据消息中的唯一标识符,从RequestResponseFuture的map中找到对应的RequestResponseFuture结构,设置响应消息,同时计数器减一,解除等待状态,使请求方收到响应消息。

3 使用方法

同步调用的示例在example文件夹的rpc目录下。

3.1 Producer

Message msg = new Message(topic,
                "",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

        long begin = System.currentTimeMillis();
        Message retMsg = producer.request(msg, ttl);
        long cost = System.currentTimeMillis() - begin;
        System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);

调用接口替换为request即可。

3.2 Consumer

需要启动一个producer,同时在覆写consumeMessage方法的时候,自定义响应消息并发送。

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                for (MessageExt msg : msgs) {
                    try {
                        System.out.printf("handle message: %s", msg.toString());
                        String replyTo = MessageUtil.getReplyToClient(msg);
                        byte[] replyContent = "reply message contents.".getBytes();
                        // create reply message with given util, do not create reply message by yourself
                        Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);

                    // send reply message with producer
                    SendResult replyResult = replyProducer.send(replyMessage, 3000);
                    System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
                } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

4 接口参数

4.1 public Message request(Message msg,long timeout)

msg:待发送的消息

timeout:同步调用超时时间

4.2 public void request(Message msg, final RequestCallback requestCallback, long timeout)

msg:待发送的消息

requestCallback:回调函数

timeout:同步调用超时时间

4.3 public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout)

msg:待发送的消息

selector:消息队列选择器

arg:消息队列选择器需要的参数

timeout:同步调用超时时间

4.4 public void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback, final long timeout)

msg:待发送的消息

selector:消息队列选择器

arg:消息队列选择器需要的参数

requestCallback:回调函数

timeout:同步调用超时时间

4.5 public Message request(final Message msg, final MessageQueue mq, final long timeout)

msg:待发送的消息

mq:目标消息队列

timeout:同步调用超时时间

4.6 public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)

msg:待发送的消息

mq:目标消息队列

requestCallback:回调函数

timeout:同步调用超时时间


1 集群搭建

1.1 单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

1)启动 NameServer
1
2
3
4
5
6
### 首先启动Name Server
$ nohup sh mqnamesrv &

### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)启动 Broker
1
2
3
4
5
### 启动Broker
$ nohup sh bin/mqbroker -n localhost:9876 &
### 验证Name Server 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...

1.2 多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

1)启动NameServer

NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:

1
2
3
4
5
6
### 首先启动Name Server
$ nohup sh mqnamesrv &

### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)启动Broker集群
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
...

````
如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中`-n`后面的地址列表用分号隔开即可,例如 `192.168.1.1:9876;192.161.2:9876`。

#### 1.3 多Master多Slave模式-异步复制
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;

- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。


##### 1)启动NameServer
```bash
### 首先启动Name Server
$ nohup sh mqnamesrv &

### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
````

##### 2)启动 Broker 集群

```bash
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &

### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &

### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

1.4 多 Master 多 Slave 模式-同步双写

每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

  • 缺点:性能比异步复制模式略低(大约低 10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

1)启动 NameServer
1
2
3
4
5
6
### 首先启动Name Server
$ nohup sh mqnamesrv &

### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2)启动 Broker 集群
1
2
3
4
5
6
7
8
9
10
11
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &

### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &

### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上 Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。$ROCKETMQ_HOME 指的 RocketMQ 安装目录,需要用户自己设置此环境变量。

2 mqadmin 管理工具

注意:

  1. 执行命令方法:./mqadmin {command} {args}
  2. 几乎所有命令都需要配置-n 表示 NameServer 地址,格式为 ip:port
  3. 几乎所有命令都可以通过-h 获取帮助
  4. 如果既有 Broker 地址(-b)配置项又有 clusterName(-c)配置项,则优先以 Broker 地址执行命令,如果不配置 Broker 地址,则对集群中所有主机执行命令,只支持一个 Broker 地址。-b 格式为 ip:port,port 默认是 10911
  5. 在 tools 下可以看到很多命令,但并不是所有命令都能使用,只有在 MQAdminStartup 中初始化的命令才能使用,你也可以修改这个类,增加或自定义命令
  6. 由于版本更新问题,少部分命令可能未及时更新,遇到错误请直接阅读相关命令源码

2.1 Topic 相关

| 名称 | 含义 | 命令选项 | 说明 |
| —————- | ——————————————————————– | ——– | ———————————————————————————————————————– | — | —— |
| updateTopic | 创建更新 Topic 配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台 Broker,地址为 ip:port |
| | | -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) |
| | | -h- | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -p | 指定新 topic 的读写权限( W=2 | R=4 | WR=6 ) |
| | | -r | 可读队列数(默认为 8) |
| | | -w | 可写队列数(默认为 8) |
| | | -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) |
| deleteTopic | 删除 Topic | -c | cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) |
| topicList | 查看 Topic 列表信息 | -h | 打印帮助 |
| | | -c | 不配置-c 只返回 topic 列表,增加-c 返回 clusterName, topic, consumerGroup 信息,即 topic 的所属集群和订阅关系,没有参数 |
| | | -n | NameServer 服务地址,格式 ip:port |
| topicRoute | 查看 Topic 路由信息 | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| topicStatus | 查看 Topic 消息队列 offset | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -b | Broker 地址,表示 topic 所在 Broker,只支持单台 Broker,地址为 ip:port |
| | | -p | 指定新 topic 的读写权限( W=2 | R=4 | WR=6 ) |
| | | -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b 优先,如果没有-b,则对集群中所有 Broker 执行命令 |
| updateOrderConf | 从 NameServer 上创建、删除、获取特定命名空间的 kv 配置,目前还未启用 | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -t | topic,键 |
| | | -v | orderConf,值 |
| | | -m | method,可选 get、put、delete |
| allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic 名称 |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -i | ipList,用逗号分隔,计算这些 ip 去负载 Topic 的消息队列 |
| statsAll | 打印 Topic 订阅关系、TPS、积累量、24h 读写总量等信息 | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -a | 是否只打印活跃 topic |
| | | -t | 指定 topic |

2.2 集群相关

| 名称 | 含义 | 命令选项 | 说明 |
| ———– | ———————————————————– | ——– | ——————————————————————————————– | —————- |
| clusterList | 查看集群信息,集群、BrokerName、BrokerId、TPS 等信息 | -m | 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) |
| | | -h | 打印帮助 |
| | | -n | NameServer 服务地址,格式 ip:port |
| | | -i | 打印间隔,单位秒 |
| clusterRT | 发送消息检测集群各 Broker RT。消息发往${BrokerName} Topic。 | -a | amount,每次探测的总数,RT = 总时间 / amount |
| | | -s | 消息大小,单位 B |
| | | -c | 探测哪个集群 |
| | | -p | 是否打印格式化日志,以 | 分割,默认不打印 |
| | | -h | 打印帮助 |
| | | -m | 所属机房,打印使用 |
| | | -i | 发送间隔,单位秒 |
| | | -n | NameServer 服务地址,格式 ip:port |

2.3 Broker 相关

名称含义命令选项说明
updateBrokerConfig更新 Broker 配置文件,会修改 Broker.conf-bBroker 地址,格式为 ip:port
-ccluster 名称
-kkey 值
-vvalue 值
-h打印帮助
-nNameServer 服务地址,格式 ip:port
brokerStatus查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面)-bBroker 地址,地址为 ip:port
-h打印帮助
-nNameServer 服务地址,格式 ip:port
brokerConsumeStatsBroker 中各个消费者的消费情况,按 Message Queue 维度返回 Consume Offset,Broker Offset,Diff,TImestamp 等信息-bBroker 地址,地址为 ip:port
-t请求超时时间
-ldiff 阈值,超过阈值才打印
-o是否为顺序 topic,一般为 false
-h打印帮助
-nNameServer 服务地址,格式 ip:port
getBrokerConfig获取 Broker 配置-bBroker 地址,地址为 ip:port
-nNameServer 服务地址,格式 ip:port
wipeWritePerm从 NameServer 上清除 Broker 写权限-bBrokerName
-nNameServer 服务地址,格式 ip:port
-h打印帮助
cleanExpiredCQ清理 Broker 上过期的 Consume Queue,如果手动减少对列数可能产生过期队列-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址,地址为 ip:port
-c集群名称
cleanUnusedTopic清理 Broker 上不使用的 Topic,从内存中释放 Topic 的 Consume Queue,如果手动删除 Topic 会产生不使用的 Topic-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址,地址为 ip:port
-c集群名称
sendMsgStatus向 Broker 发消息,返回发送状态和 RT-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBrokerName,注意不同于 Broker 地址
-s消息大小,单位 B
-c发送次数

2.4 消息相关

名称含义命令选项说明
queryMsgById根据 offsetMsgId 查询 msg,如果使用开源控制台,应使用 offsetMsgId,此命令还有其他参数,具体作用请阅读 QueryMsgByIdSubCommand。-imsgId
-h打印帮助
-nNameServer 服务地址,格式 ip:port
queryMsgByKey根据消息 Key 查询消息-kmsgKey
-tTopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
queryMsgByOffset根据 Offset 查询消息-bBroker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到)
-iquery 队列 id
-ooffset 值
-ttopic 名称
-h打印帮助
-nNameServer 服务地址,格式 ip:port
queryMsgByUniqueKey根据 msgId 查询,msgId 不同于 offsetMsgId,区别详见常见运维问题。-g,-d 配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果-h打印帮助
-nNameServer 服务地址,格式 ip:port
-iuniqe msg id
-gconsumerGroup
-dclientId
-ttopic 名称
checkMsgSendRT检测向 topic 发消息的 RT,功能类似 clusterRT-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-a探测次数
-s消息大小
sendMessage发送一条消息,可以根据配置发往特定 Message Queue,或普通发送。-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-pbody,消息体
-kkeys
-ctags
-bBrokerName
-iqueueId
consumeMessage消费消息。可以根据 offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见 ConsumeMessageCommand。-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-bBrokerName
-o从 offset 开始消费
-iqueueId
-g消费者分组
-s开始时间戳,格式详见-h
-d结束时间戳
-c消费多少条消息
printMsg从 Broker 消费消息并打印,可选时间段-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-c字符集,例如 UTF-8
-ssubExpress,过滤表达式
-b开始时间戳,格式参见-h
-e结束时间戳
-d是否打印消息体
printMsgByQueue类似 printMsg,但指定 Message Queue-h打印帮助
-nNameServer 服务地址,格式 ip:port
-ttopic 名称
-iqueueId
-aBrokerName
-c字符集,例如 UTF-8
-ssubExpress,过滤表达式
-b开始时间戳,格式参见-h
-e结束时间戳
-p是否打印消息
-d是否打印消息体
-f是否统计 tag 数量并打印
resetOffsetByTime按时间戳重置 offset,Broker 和 consumer 都会重置-h打印帮助
-nNameServer 服务地址,格式 ip:port
-g消费者分组
-ttopic 名称
-s重置为此时间戳对应的 offset
-f是否强制重置,如果 false,只支持回溯 offset,如果 true,不管时间戳对应 offset 与 consumeOffset 关系
-c是否重置 c++客户端 offset

2.5 消费者、消费组相关

名称含义命令选项说明
consumerProgress查看订阅组消费状态,可以查看具体的 client IP 的消息积累量-g消费者所属组名
-s是否打印 client IP
-h打印帮助
-nNameServer 服务地址,格式 ip:port
consumerStatus查看消费者状态,包括同一个分组中是否都是相同的订阅,分析 Process Queue 是否堆积,返回消费者 jstack 结果,内容较多,使用者参见 ConsumerStatusSubCommand-h打印帮助
-nNameServer 服务地址,格式 ip:port
-gconsumer group
-iclientId
-s是否执行 jstack
updateSubGroup更新或创建订阅关系-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址
-c集群名称
-g消费者分组名称
-s分组是否允许消费
-m是否从最小 offset 开始消费
-d是否是广播模式
-q重试队列数量
-r最大重试次数
-i当 slaveReadEnable 开启时有效,且还未达到从 slave 消费时建议从哪个 BrokerId 消费,可以配置备机 id,主动从备机消费
-w如果 Broker 建议从 slave 消费,配置决定从哪个 slave 消费,配置 BrokerId,例如 1
-a当消费者数量变化时是否通知其他消费者负载均衡
deleteSubGroup从 Broker 删除订阅关系-nNameServer 服务地址,格式 ip:port
-h打印帮助
-bBroker 地址
-c集群名称
-g消费者分组名称
cloneGroupOffset在目标群组中使用源群组的 offset-nNameServer 服务地址,格式 ip:port
-h打印帮助
-s源消费者组
-d目标消费者组
-ttopic 名称
-o暂未使用

2.6 连接相关

名称含义命令选项说明
consumerConnection查询 Consumer 的网络连接-g消费者所属组名
-nNameServer 服务地址,格式 ip:port
-h打印帮助
producerConnection查询 Producer 的网络连接-g生产者所属组名
-t主题名称
-nNameServer 服务地址,格式 ip:port
-h打印帮助

2.7 NameServer 相关

名称含义命令选项说明
updateKvConfig更新 NameServer 的 kv 配置,目前还未使用-s命名空间
-kkey
-vvalue
-nNameServer 服务地址,格式 ip:port
-h打印帮助
deleteKvConfig删除 NameServer 的 kv 配置-s命名空间
-kkey
-nNameServer 服务地址,格式 ip:port
-h打印帮助
getNamesrvConfig获取 NameServer 配置-nNameServer 服务地址,格式 ip:port
-h打印帮助
updateNamesrvConfig修改 NameServer 配置-nNameServer 服务地址,格式 ip:port
-h打印帮助
-kkey
-vvalue

2.8 其他

名称含义命令选项说明
startMonitoring开启监控进程,监控消息误删、重试队列消息数等-nNameServer 服务地址,格式 ip:port
-h打印帮助

3 运维常见问题

3.1 RocketMQ 的 mqadmin 命令报错问题

问题描述:有时候在部署完 RocketMQ 集群后,尝试执行“mqadmin”一些运维命令,会出现下面的异常信息:

1
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed

解决方法:可以在部署 RocketMQ 集群的虚拟机上执行export NAMESRV_ADDR=ip:9876(ip 指的是集群中部署 NameServer 组件的机器 ip 地址)命令之后再使用“mqadmin”的相关命令进行查询,即可得到结果。

3.2 RocketMQ 生产端和消费端版本不一致导致不能正常消费的问题

问题描述:同一个生产端发出消息,A 消费端可消费,B 消费端却无法消费,rocketMQ Console 中出现:

1
Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message的异常消息。

解决方案:RocketMQ 的 jar 包:rocketmq-client 等包应该保持生产端,消费端使用相同的 version。

3.3 新增一个 topic 的消费组时,无法消费历史消息的问题

问题描述:当同一个 topic 的新增消费组启动时,消费的消息是当前的 offset 的消息,并未获取历史消息。

解决方案:rocketmq 默认策略是从消息队列尾部,即跳过历史消息。如果想消费历史消息,则需要设置:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere。常用的有以下三种配置:

  • 默认配置,一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费,即跳过历史消息;
1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  • 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费,即消费 Broker 未过期的历史消息;
1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,和 consumer.setConsumeTimestamp()配合使用,默认是半个小时以前;
1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);

3.4 如何开启从 Slave 读数据功能

在某些情况下,Consumer 需要将消费位点重置到 1-2 天前,这时在内存有限的 Master Broker 上,CommitLog 会承载比较重的 IO 压力,影响到该 Broker 的其它消息的读与写。可以开启slaveReadEnable=true,当 Master Broker 发现 Consumer 的消费位点与 CommitLog 的最新值的差值的容量超过该机器内存的百分比(accessMessageInMemoryMaxRatio=40%),会推荐 Consumer 从 Slave Broker 中去读取数据,降低 Master Broker 的 IO。

3.5 性能调优问题

异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁,调整 Broker 配置项useReentrantLockWhenPutMessage,默认为 false;异步刷盘建议开启TransientStorePoolEnable;建议关闭 transferMsgByHeap,提高拉消息效率;同步刷盘建议适当增大sendMessageThreadPoolNums,具体配置需要经过压测。

3.6 在 RocketMQ 中 msgId 和 offsetMsgId 的含义与区别

使用 RocketMQ 完成生产者客户端消息发送后,通常会看到如下日志打印信息:

1
SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000, offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
  • msgId,对于客户端来说 msgId 是由客户端 producer 实例端生成的,具体来说,调用方法MessageClientIDSetter.createUniqIDBuffer()生成唯一的 Id;
  • offsetMsgId,offsetMsgId 是由 Broker 服务端在写入消息时生成的(采用”IP 地址+Port 端口”与“CommitLog 的物理偏移量地址”做了一个字符串拼接),其中 offsetMsgId 就是在 RocketMQ 控制台直接输入查询的那个 messageId。


前言

该文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的 RocketMQ 集群。

详细的新集群部署和旧集群升级指南请参考 部署指南

1. 源码构建

构建分为两个部分,需要先构建 DLedger,然后 构建 RocketMQ

1.1 构建 DLedger

git clone https://github.com/openmessaging/openmessaging-storage-dledger.git

cd openmessaging-storage-dledger

mvn clean install -DskipTests

1.2 构建 RocketMQ

git clone https://github.com/apache/rocketmq.git

cd rocketmq

git checkout -b store_with_dledger origin/store_with_dledger

mvn -Prelease-all -DskipTests clean install -U

2. 快速部署

在构建成功后

cd distribution/target/apache-rocketmq

sh bin/dledger/fast-try.sh start

如果上面的步骤执行成功,可以通过 mqadmin 运维命令查看集群状态。

sh bin/mqadmin clusterList -n 127.0.0.1:9876

顺利的话,会看到如下内容:

ClusterList

(BID 为 0 的表示 Master,其余都是 Follower)

启动成功,现在可以向集群收发消息,并进行容灾切换测试了。

关闭快速集群,可以执行:

sh bin/dledger/fast-try.sh stop

快速部署,默认配置在 conf/dledger 里面,默认的存储路径在 /tmp/rmqstore。

3. 容灾切换

部署成功,杀掉 Leader 之后(在上面的例子中,杀掉端口 30931 所在的进程),等待约 10s 左右,用 clusterList 命令查看集群,就会发现 Leader 切换到另一个节点了。