0%

Technology Architecture

The RocketMQ architecture is divided into four parts, as shown in the figure above:

  • Producer:The role of message publishing supports distributed cluster mode deployment. Producer selects the corresponding Broker cluster queue for message delivery through MQ's load balancing module. The delivery process supports fast failure and low latency.

  • Consumer:The role of message consumption supports distributed cluster deployment. Support push, pull two modes to consume messages. It also supports cluster mode and broadcast mode consumption, and it provides a real-time message subscription mechanism to meet the needs of most users.

  • NameServer:NameServer is a very simple Topic routing registry with a role similar to ZooKeeper in Dubbo, which supports dynamic registration and discovery of Broker. It mainly includes two functions: Broker management, NameServer accepts the registration information of the Broker cluster and saves it as the basic data of the routing information. Then provide a heartbeat detection mechanism to check whether the broker is still alive; routing information management, each NameServer will save the entire routing information about the Broker cluster and the queue information for the client query. Then the Producer and Consumer can know the routing information of the entire Broker cluster through the NameServer, so as to deliver and consume the message. The NameServer is usually deployed in a cluster mode, and each instance does not communicate with each other. Broker registers its own routing information with each NameServer, so each NameServer instance stores a complete routing information. When a NameServer is offline for some reason, the Broker can still synchronize its routing information with other NameServers. The Producer and Consumer can still dynamically sense the information of the Broker's routing.

  • BrokerServer:Broker is responsible for the storage, delivery and query of messages and high availability guarantees. In order to achieve these functions, Broker includes the following important sub-modules.

  1. Remoting Module:The entire broker entity handles requests from the clients side.
  2. Client Manager:Topic subscription information for managing the client (Producer/Consumer) and maintaining the Consumer
  3. Store Service:Provides a convenient and simple API interface for handling message storage to physical hard disks and query functions.
  4. HA Service:Highly available service that provides data synchronization between Master Broker and Slave Broker.
  5. Index Service:The message delivered to the Broker is indexed according to a specific Message key to provide a quick query of the message.

Deployment architecture

RocketMQ Network deployment features

  • NameServer is an almost stateless node that can be deployed in a cluster without any information synchronization between nodes.

  • The broker deployment is relatively complex. The Broker is divided into the Master and the Slave. One Master can correspond to multiple Slaves. However, one Slave can only correspond to one Master. The correspondence between the Master and the Slave is defined by specifying the same BrokerName and different BrokerId. The BrokerId is 0. Indicates Master, non-zero means Slave. The Master can also deploy multiple. Each broker establishes a long connection with all nodes in the NameServer cluster, and periodically registers Topic information to all NameServers. Note: The current RocketMQ version supports a Master Multi Slave on the deployment architecture, but only the slave server with BrokerId=1 will participate in the read load of the message.

  • The Producer establishes a long connection with one of the nodes in the NameServer cluster (randomly selected), periodically obtains Topic routing information from the NameServer, and establishes a long connection to the Master that provides the Topic service, and periodically sends a heartbeat to the Master. Producer is completely stateless and can be deployed in a cluster.

  • The Consumer establishes a long connection with one of the nodes in the NameServer cluster (randomly selected), periodically obtains Topic routing information from the NameServer, and establishes a long connection to the Master and Slave that provides the Topic service, and periodically sends heartbeats to the Master and Slave. The Consumer can subscribe to the message from the Master or subscribe to the message from the Slave. When the consumer pulls the message to the Master, the Master server will generate a read according to the distance between the offset and the maximum offset. I/O), and whether the server is readable or not, the next time it is recommended to pull from the Master or Slave.

Describe the cluster workflow in conjunction with the deployment architecture diagram:

  • Start the NameServer, listen to the port after the NameServer, and wait for the Broker, Producer, and Consumer to connect, which is equivalent to a routing control center.
  • The Broker starts, keeps a long connection with all NameServers, and sends heartbeat packets periodically. The heartbeat packet contains the current broker information (IP+ port, etc.) and stores all Topic information. After the registration is successful, there is a mapping relationship between Topic and Broker in the NameServer cluster.
  • Before sending and receiving a message, create a Topic. When creating a Topic, you need to specify which Brokers the Topic should be stored on, or you can automatically create a Topic when sending a message.
  • Producer sends a message. When starting, it first establishes a long connection with one of the NameServer clusters, and obtains from the NameServer which Brokers are currently sent by the Topic. Polling selects a queue from the queue list and then establishes with the broker where the queue is located. Long connection to send a message to the broker.
  • The Consumer is similar to the Producer. It establishes a long connection with one of the NameServers, obtains which Brokers the current Topic exists on, and then directly establishes a connection channel with the Broker to start consuming messages.

1 RocketMQ's mqadmin command error.

Problem: Sometimes after deploying the RocketMQ cluster, when you try to execute some commands of "mqadmin", the following exception will appear:

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to  failed

Solution: Execute export NAMESRV_ADDR=ip:9876 (ip refers to the address of NameServer deployed in the cluster) on the VM that deploys the RocketMQ cluster.Then you will execute commands of "mqadmin" successfully.

2 The inconsistent version of RocketMQ between the producer and consumer leads to the problem that message can't be consumed normally.

Problem: The same producer sends a message, consumer A can consume, but consumer B can't consume, and the RocketMQ Console appears:

Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message

Solution: The jar package of RocketMQ, such as rocketmq-client, should be the same version on the consumer and producer.

3 When adding a new topic consumer group, historical messages can't be consumed.

Problem: When a new consumer group of the same topic is started, the consumed message is the current offset message, and the historical message is not obtained.

Solution: The default policy of rocketmq is to start from the end of the message queue and skip the historical message. If you want to consume historical message, you need to set:

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere

There are three common configurations:

  • By default, a new subscription group starts to consume from the end of the queue for the first time, and then restarts and continue to consume from the last consume position, that is, to skip the historical message.
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  • A new subscription group starts to consume from the head of the queue for the first time, and then restarts and continue to consume from the last consume position, that is, to consume the historical message that is not expired on Broker.
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • A new subscription group starts to consume from the specified time point for the first time, and then restarts and continue to consume from the last consume position. It is used together with consumer.setConsumeTimestamp(). The default is half an hour ago.
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);

4 How to enable reading data from Slave

In some cases, the Consumer needs to reset the consume position to 1-2 days ago. At this time, on the Master Broker with limited memory, the CommitLog will carry a relatively heavy IO pressure, affecting the reading and writing of other messages on that Broker. You can enable slaveReadEnable=true. When Master Broker finds that the difference between the Consumer's consume position and the latest value of CommitLog exceeds the percentage of machine's memory (accessMessageInMemoryMaxRatio=40%), it will recommend Consumer to read from Slave Broker and relieve Master Broker's IO.

5 Performance

Asynchronous flush disk is recommended to use spin lock.

Synchronous flush disk is recommended to use reentrant lock. Adjust the Broker configuration item useReentrantLockWhenPutMessage, and the default value is false.

Asynchronous flush disk is recommended to open TransientStorePoolEnable and close transferMsgByHeap to improve the efficiency of pulling message;

Synchronous flush disk is recommended to increase the sendMessageThreadPoolNums appropriately. The specific configuration needs to be tested.

6 The meaning and difference between msgId and offsetMsgId in RocketMQ

After sending message with RocketMQ, you will usually see the following log:

SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000, offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
  • msgId,for the client, the msgId is generated by the producer instance. Specifically, the method MessageClientIDSetter.createUniqIDBuffer() is called to generate a unique Id.
  • offsetMsgId, offsetMsgId is generated by the Broker server when writing a message ( string concating "IP address + port" and "CommitLog's physical offset address"), and offsetMsgId is the messageId used to query in the RocketMQ console.

3.1 Broker Role

Broker Role is ASYNC_MASTER, SYNC_MASTER or SLAVE. If you cannot tolerate message missing, we suggest you deploy SYNC_MASTER and attach a SLAVE to it. If you feel ok about missing, but you want the Broker to be always available, you may deploy ASYNC_MASTER with SLAVE. If you just want to make it easy, you may only need a ASYNC_MASTER without SLAVE.

3.2 FlushDiskType

ASYNC_FLUSH is recommended, for SYNC_FLUSH is expensive and will cause too much performance loss. If you want reliability, we recommend you use SYNC_MASTER with SLAVE.

3.3 Broker Configuration

Parameter nameDefaultDescription
listenPort10911listen port for client
namesrvAddrnullname server address
brokerIP1InetAddress for network interfaceShould be configured if having multiple addresses
brokerIP2InetAddress for network interfaceIf configured for the Master broker in the Master/Slave cluster, slave broker will connect to this port for data synchronization
brokerNamenullbroker name
brokerClusterNameDefaultClusterthis broker belongs to which cluster
brokerId0broker id, 0 means master, positive integers mean slave
storePathCommitLog$HOME/store/commitlog/file path for commit log
storePathConsumerQueue$HOME/store/consumequeue/file path for consume queue
mappedFileSizeCommitLog1024 1024 1024(1G)mapped file size for commit log
deleteWhen04When to delete the commitlog which is out of the reserve time
fileReserverdTime72The number of hours to keep a commitlog before deleting it
brokerRoleASYNC_MASTERSYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskTypeASYNC_FLUSH{SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance.

1 Subscribe and Publish

Message publication refers to that a producer sends messages to a topic; Message subscription means a consumer follows a topic with certain tags and then consumes data from that topic.

2 Message Ordering

Message ordering refers to that a group of messages can be consumed orderly as they are published. For example, an order generates three messages: order creation, order payment, and order completion. It only makes sense to consume them in their generated order, but orders can be consumed in parallel at the same time. RocketMQ can strictly guarantee these messages are in order.

Orderly message is divided into global orderly message and partitioned orderly message. Global order means that all messages under a certain topic must be in order, partitioned order only requires each group of messages are consumed orderly.

  • Global message ordering:

    For a given Topic, all messages are published and consumed in strict first-in-first-out (FIFO) order.

    Applicable scenario: the performance requirement is not high, and all messages are published and consumed according to FIFO principle strictly.

  • Partitioned message ordering:

    For a given Topic, all messages are partitioned according to sharding key. Messages within the same partition are published and consumed in strict FIFO order. Sharding key is the key field to distinguish message’s partition, which is a completely different concept from the key of ordinary messages.

    Applicable scenario: high performance requirement, with sharding key as the partition field, messages within the same partition are published and consumed according to FIFO principle strictly.

3 Message Filter

Consumers of RocketMQ can filter messages based on tags as well as supporting for user-defined attribute filtering. Message filter is currently implemented on the Broker side, with the advantage of reducing the network transmission of useless messages for Consumer and the disadvantage of increasing the burden on the Broker and relatively complex implementation.

4 Message Reliability

RocketMQ supports high reliability of messages in several situations:

1 Broker shutdown normally

2 Broker abnormal crash

3 OS Crash

4 The machine is out of power, but it can be recovered immediately

5 The machine cannot be started up (the CPU, motherboard, memory and other key equipment may be damaged)

6 Disk equipment damaged

In the four cases of 1), 2), 3), and 4) where the hardware resource can be recovered immediately, RocketMQ guarantees that the message will not be lost or a small amount of data will be lost (depending on whether the flush disk type is synchronous or asynchronous).

5 ) and 6) are single point of failure and cannot be recovered. Once it happens, all messages on the single point will be lost. In both cases, RocketMQ ensures that 99% of the messages are not lost through asynchronous replication, but a very few number of messages may still be lost. Synchronous double write mode can completely avoid single point of failure, which will surely affect the performance and suitable for the occasion of high demand for message reliability, such as money related applications. Note: RocketMQ supports synchronous double writes since version 3.0.

5 At Least Once

At least Once refers to that every message will be delivered at least once. RocketMQ supports this feature because the Consumer pulls the message locally and does not send an ack back to the server until it has consumed it.

6 Backtracking Consumption

Backtracking consumption refers to that the Consumer has consumed the message successfully, but the business needs to consume again. To support this function, the message still needs to be retained after the Broker sends the message to the Consumer successfully. The re-consumption is normally based on time dimension. For example, after the recovery of the Consumer system failured, the data one hour ago needs to be re-consumed, then the Broker needs to provide a mechanism to reverse the consumption progress according to the time dimension. RocketMQ supports backtracking consumption by time trace, with the time dimension down to milliseconds.

7 Transactional Message

RocketMQ transactional message refers to the fact that the application of a local transaction and the sending of a Message operation can be defined in a global transaction which means both succeed or failed simultaneously. RocketMQ transactional message provides distributed transaction functionality similar to X/Open XA, enabling the ultimate consistency of distributed transactions through transactional message.

8 Scheduled Message

Scheduled message(delay queue) refers to that messages are not consumed immediately after they are sent to the broker, but waiting to be delivered to the real topic after a specific time.

The broker has a configuration item, messageDelayLevel, with default values “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, 18 levels. Users can configure a custom messageDelayLevel. Note that messageDelayLevel is a broker’s property rather than a topic’s. When sending a message, just set the delayLevel level: msg.setDelayLevel(level). There are three types of levels:

  • level == 0, The message is not a delayed message
  • 1<=level<=maxLevel, Message delay specific time, such as level==1, delay for 1s
  • level > maxLevel, than level== maxLevel, such as level==20, delay for 2h

Scheduled messages are temporarily saved in a topic named SCHEDULE_TOPIC_XXXX, and saved in a specific queue according to delayTimeLevel, queueId = delayTimeLevel - 1, that is, only messages with the same delay are saved in a queue, ensuring that messages with the same sending delay can be consumed orderly. The broker consumes SCHEDULE_TOPIC_XXXX on schedule and writes messages to the real topic.

Note that Scheduled messages are counted both the first time they are written and the time they are scheduled to be written to the real topic, so both the send number and the TPS are increased.

9 Message Retry

When the Consumer fails to consume the message, a retry mechanism is needed to make the message to be consumed again. Consumer's consume failure can usually be classified as follows:

  • Due to the reasons of the message itself, such as deserialization failure, the message data itself cannot be processed (for example, the phone number of the current message is cancelled and cannot be charged), etc. This kind of error usually requires skipping this message and consuming others since immediately retry would be failed 99%, so it is better to provide a timed retry mechanism that retries after 10 seconds.
  • Due to the reasons of dependent downstream application services are not available, such as db connection is not usable, perimeter network is not unreachable, etc. When this kind of error is encountered, consuming other messages will also result in an error even if the current failed message is skipped. In this case, it is recommended to sleep for 30s before consuming the next message, which will reduce the pressure on the broker to retry the message.

RocketMQ will set up a retry queue named “%RETRY%+consumerGroup” for each consumer group(Note that the retry queue for this topic is for consumer groups, not for each topic) to temporarily save messages cannot be consumed by customer due to all kinds of reasons. Considering that it takes some time for the exception to recover, multiple retry levels are set for the retry queue, and each retry level has a corresponding re-deliver delay. The more retries, the greater the deliver delay. RocketMQ first save retry messages to the delay queue which topic is named “SCHEDULE_TOPIC_XXXX”, then background schedule task will save the messages to “%RETRY%+consumerGroup” retry queue according to their corresponding delay.

10 Message Resend

When a producer sends a message, the synchronous message will be resent if fails, the asynchronous message will retry and oneway message is without any guarantee. Message resend ensures that messages are sent successfully and without lost as much as possible, but it can lead to message duplication, which is an unavoidable problem in RocketMQ. Under normal circumstances, message duplication will not occur, but when there is a large number of messages and network jitter, message duplication will be a high-probability event. In addition, producer initiative messages resend and the consumer load changes will also result in duplicate messages. The message retry policy can be set as follows:

  • retryTimesWhenSendFailed: Synchronous message retry times when send failed, default value is 2, so the producer will try to send retryTimesWhenSendFailed + 1 times at most. To ensure that the message is not lost, producer will try sending the message to another broker instead of selecting the broker that failed last time. An exception will be thrown if it reaches the retry limit, and the client should guarantee that the message will not be lost. Messages will resend when RemotingException, MQClientException, and partial MQBrokerException occur.
  • retryTimesWhenSendAsyncFailed: Asynchronous message retry times when send failed, asynchronous retry sends message to the same broker instead of selecting another one and does not guarantee that the message wont lost.
  • retryAnotherBrokerWhenNotStoreOK: Message flush disk (master or slave) timeout or slave not available (return status is not SEND_OK), whether to try to send to another broker, default value is false. Very important messages can set to true.

11 Flow Control

Producer flow control, because broker processing capacity reaches a bottleneck; Consumer flow control, because the consumption capacity reaches a bottleneck.

Producer flow control:

  • When commitLog file locked time exceeds osPageCacheBusyTimeOutMills, default value of osPageCacheBusyTimeOutMills is 1000 ms, then return flow control.
  • If transientStorePoolEnable == true, and the broker is asynchronous flush disk type, and resources are insufficient in the transientStorePool, reject the current send request and return flow control.
  • The broker checks the head request wait time of the send request queue every 10ms. If the wait time exceeds waitTimeMillsInSendQueue, which default value is 200ms, the current send request is rejected and the flow control is returned.
  • The broker implements flow control by rejecting send requests.

Consumer flow control:

  • When consumer local cache messages number exceeds pullThresholdForQueue, default value is 1000.
  • When consumer local cache messages size exceeds pullThresholdSizeForQueue, default value is 100MB.
  • When consumer local cache messages span exceeds consumeConcurrentlyMaxSpan, default value is 2000.

The result of consumer flow control is to reduce the pull frequency.

12 Dead Letter Queue

Dead letter queue is used to deal messages that cannot be consumed normally. When a message is consumed failed at first time, the message queue will automatically resend the message. If the consumption still fails after the maximum number retry, it indicates that the consumer cannot properly consume the message under normal circumstances. At this time, the message queue will not immediately abandon the message, but send it to the special queue corresponding to the consumer.

RocketMQ defines the messages that could not be consumed under normal circumstances as Dead-Letter Messages, and the special queue in which the Dead-Letter Messages are saved as Dead-Letter Queues. In RocketMQ, the consumer instance can consume again by resending messages in the Dead-Letter Queue using console.

Consumer


1 Consumption process idempotent

RocketMQ cannot avoid Exactly-Once, so if the business is very sensitive to consumption repetition, it is important to perform deduplication at the business level. Deduplication can be done with a relational database. First, you need to determine the unique key of the message, which can be either msgId or a unique identifier field in the message content, such as the order Id. Determine if a unique key exists in the relational database before consumption. If it does not exist, insert it and consume it, otherwise skip it. (The actual process should consider the atomic problem, determine whether there is an attempt to insert, if the primary key conflicts, the insertion fails, skip directly)

2 Slow message processing

2.1 Increase consumption parallelism

Most messages consumption behaviors are IO-intensive, That is, it may be to operate the database, or call RPC. The consumption speed of such consumption behavior lies in the throughput of the back-end database or the external system. By increasing the consumption parallelism, the total consumption throughput can be increased, but the degree of parallelism is increased to a certain extent. Instead it will fall.Therefore, the application must set a reasonable degree of parallelism. There are several ways to modify the degree of parallelism of consumption as follows:

  • Under the same ConsumerGroup, increase the degree of parallelism by increasing the number of Consumer instances (note that the Consumer instance that exceeds the number of subscription queues is invalid). Can be done by adding machines, or by starting multiple processes on an existing machine.
  • Improve the consumption parallel thread of a single Consumer by modifying the parameters consumeThreadMin and consumeThreadMax.

2.2 Batch mode consumption

Some business processes can increase consumption throughput to a large extent if they support batch mode consumption. For example, order deduction application, it takes 1s to process one order at a time, and it takes only 2s to process 10 orders at a time. In this way, the throughput of consumption can be greatly improved. By setting the consumer's consumeMessageBatchMaxSize to return a parameter, the default is 1, that is, only one message is consumed at a time, for example, set to N, then the number of messages consumed each time is less than or equal to N.

2.3 Skip non-critical messages

When a message is accumulated, if the consumption speed cannot keep up with the transmission speed, if the service does not require high data, you can choose to discard the unimportant message. For example, when the number of messages in a queue is more than 100,000 , try to discard some or all of the messages, so that you can quickly catch up with the speed of sending messages. The sample code is as follows:

public ConsumeConcurrentlyStatus consumeMessage(
        List msgs,
        ConsumeConcurrentlyContext context){
   long offest = msgs.get(0).getQueueOffset();
   String maxOffset =    
               msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
   long diff = Long.parseLong(maxOffset) - offset;
   if(diff > 100000){
        //TODO Special handling of message accumulation
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    //TODO Normal consumption process
    return ConcumeConcurrentlyStatus.CONSUME_SUCCESS;
}

2.4 Optimize each message consumption process

For example, the consumption process of a message is as follows:

  • Query from DB according to the message [data 1]
  • Query from DB according to the message [data 2]
  • Complex business calculations
  • Insert [Data 3] into the DB
  • Insert [Data 4] into the DB

There are 4 interactions with the DB in the consumption process of this message. If it is calculated by 5ms each time, it takes a total of 20ms. If the business calculation takes 5ms, then the total time is 25ms, So if you can optimize 4 DB interactions to 2 times, the total time can be optimized to 15ms, which means the overall performance is increased by 40%. Therefore, if the application is sensitive to delay, the DB can be deployed on the SSD hard disk. Compared with the SCSI disk, the former RT will be much smaller.

3 Print Log

If the amount of messages is small, it is recommended to print the message in the consumption entry method, consume time, etc., to facilitate subsequent troubleshooting.

public ConsumeConcurrentlyStatus consumeMessage(
          List msgs,
    ConsumeConcurrentlyContext context){
    log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
    //TODO Normal consumption process
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

If you can print the time spent on each message, it will be more convenient when troubleshooting online problems such as slow consumption.

4 Other consumption suggestions

4.1、Consumer Group and Subscriptions

The first thing you should be aware of is that different Consumer Group can consume the same topic independently, and each of them will have their own consuming offsets. Please make sure each Consumer within the same Group to subscribe the same topics.

4.2、Orderly

The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.

4.3、Concurrently

As the name tells, the Consumer will consume the messages concurrently. It is recommended to use this for good performance. It is not recommended to throw exceptions, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER instead.

4.4、Consume Status

For MessageListenerConcurrently, you can return RECONSUME_LATER to tell the consumer that you can not consume it right now and want to reconsume it later. Then you can continue to consume other messages. For MessageListenerOrderly, because you care about the order, you can not jump over the message, but you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait for a moment.

4.5、Blocking

It is not recommend to block the Listener, because it will block the thread pool, and eventually may stop the consuming process.

4.6、Thread Number

The consumer use a ThreadPoolExecutor to process consuming internally, so you can change it by setting setConsumeThreadMin or setConsumeThreadMax.

4.7、ConsumeFromWhere

When a new Consumer Group is established, it will need to decide whether it needs to consume the historical messages which had already existed in the Broker. CONSUME_FROM_LAST_OFFSET will ignore the historical messages, and consume anything produced after that. CONSUME_FROM_FIRST_OFFSET will consume every message existed in the Broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages produced after the specified timestamp.

Producer


1 Message Sending Tips
1.1 The Use of Tags

One application instance should use one topic as much as possible and the subtype of messages can be marked by tags. Tag provides extra flexibility to users. In the consume subscribing process, the messages filtering can only be handled by using tags when the tags are specified in the message sending process: message.setTags("TagA").

1.2 The Use of Keys

A business key can be set in one message and it will be easier to look up the message on a broker server to diagnose issues during development. Each message will be created index(hash index) by server, instance can query the content of this message by topic and key and who consumes the message.Because of the hash index, make sure that the key should be unique in order to avoid potential hash index conflict.

// Order Id
String orderId = "20034568923546";
message.setKeys(orderId);
1.3 The Log Print

When sending a message,no matter success or fail, a message log must be printed which contains SendResult and Key. It is assumed that we will always get SEND_OK if no exception is thrown. Below is a list of descriptions about each status:

  • SEND_OK

SEND_OK means sending message successfully. SEND_OK does not mean it is reliable. To make sure no message would be lost, you should also enable SYNC_MASTER or SYNC_FLUSH.

  • FLUSH_DISK_TIMEOUT

FLUSH_DISK_TIMEOUT means sending message successfully but the Broker flushing the disk with timeout. In this kind of condition, the Broker has saved this message in memory, this message will be lost only if the Broker was down. The FlushDiskType and SyncFlushTimeout could be specified in MessageStoreConfig. If the Broker set MessageStoreConfig’s FlushDiskType=SYNC_FLUSH(default is ASYNC_FLUSH), and the Broker doesn’t finish flushing the disk within MessageStoreConfig’s syncFlushTimeout(default is 5 secs), you will get this status.

  • FLUSH_SLAVE_TIMEOUT

FLUSH_SLAVE_TIMEOUT means sending messages successfully but the slave Broker does not finish synchronizing with the master. If the Broker’s role is SYNC_MASTER(default is ASYNC_MASTER), and the slave Broker doesn’t finish synchronizing with the master within the MessageStoreConfig’s syncFlushTimeout(default is 5 secs), you will get this status.

  • SLAVE_NOT_AVAILABLE

SLAVE_NOT_AVAILABLE means sending messages successfully but no slave Broker configured. If the Broker’s role is SYNC_MASTER(default is ASYNC_MASTER), but no slave Broker is configured, you will get this status.

2 Operations on Message Sending failed

The send method of Producer can be retried, the retry process is illustrated below:

  • The method will retry at most 2 times(2 times in synchronous mode, 0 times in asynchronous mode).
  • If sending failed, it will turn to the next Broker. This strategy will be executed when the total costing time is less then sendMsgTimeout(default is 10 seconds).
  • The retry method will be terminated if timeout exception was thrown when sending messages to Broker.

The strategy above could make sure message sending successfully to a certain degree. Some more retry strategies, such as we could try to save the message to database if calling the send synchronous method failed and then retry by background thread's timed tasks, which will make sure the message is sent to Broker,could be improved if asking for high reliability business requirement.

The reasons why the retry strategy using database have not integrated by the RocketMQ client will be explained below: Firstly, the design mode of the RocketMQ client is stateless mode. It means that the client is designed to be horizontally scalable at each level and the consumption of the client to physical resources is only CPU, memory and network. Then, if a key-value memory module is integrated by the client itself, the Asyn-Saving strategy will be utilized in consideration of the high resource consumption of the Syn-Saving strategy. However, given that operations staff does not manage the client shutoff, some special commands, such as kill -9, may be used which will lead to the lost of message because of no saving in time. Furthermore, the physical resource running Producer is not appropriate to save some significant data because of low reliability. Above all, the retry process should be controlled by program itself.

3 Send Messages in One-way Mode

The message sending is usually a process like below:

  • Client sends request to sever.
  • Sever handles request
  • Sever returns response to client

The total costing time of sending one message is the sum of costing time of three steps above. Some situations demand that total costing time must be in a quite low level, however, do not take reliable performance into consideration, such as log collection. This kind of application could be called in one-way mode, which means client sends request but not wait for response. In this kind of mode, the cost of sending request is only a call of system operation which means one operation writing data to client socket buffer. Generally, the time cost of this process will be controlled n microseconds level.

1 Key Attributes of Message Trace Data

ProducerConsumerBroker
production instance informationconsumption instance informationmessage Topic
send message timepost time, post roundmessage storage location
whether the message was sent successfullyWhether the message was successfully consumedThe Key of the message
Time spent sendingTime spent consumingTag of the message

2 Support for Message Trace Cluster Deployment

2.1 Broker Configuration Fille

The properties profile content of the Broker side enabled message trace feature is pasted here:

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if msg tracing is open,the flag will be true
traceTopicEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876

2.2 Normal Mode

Each Broker node in the RocketMQ cluster is used to store message trace data collected and sent from the Client.Therefore, there are no requirements or restrictions on the number of Broker nodes in the RocketMQ cluster.

2.3 Physical IO Isolation Mode

For scenarios with large amount of trace message data , one of the Broker nodes in the RocketMQ cluster can be selected to store the trace message , so that the common message data of the user and the physical IO of the trace message data are completely isolated from each other.In this mode, there are at least two Broker nodes in the RockeMQ cluster, one of which is defined as the server on which message trace data is stored.

2.4 Start the Broker that Starts the MessageTrace

nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &

3 Save the Topic Definition of Message Trace

RocketMQ's message trace feature supports two ways to store trace data:

3.1 System-level TraceTopic

By default, message track data is stored in the system-level TraceTopic(names:RMQ_SYS_TRACE_TOPIC)。This Topic is automatically created when the Broker node is started(As described above, the switch variable traceTopicEnable needs to be set to true in the Broker configuration file)。

3.2 Custom TraceTopic

If the user is not prepared to store the message track data in the system-level default TraceTopic, you can also define and create a user-level Topic to save the track (that is, to create a regular Topic to save the message track data)。The following section introduces how the Client interface supports the user-defined TraceTopic.

4 Client Practices that Support Message Trace

In order to reduce as much as possible the transformation work of RocketMQ message trace feature used in the user service system, the author added a switch parameter (enableMsgTrace) to the original interface in the design to realize whether the message trace is opened or not.

4.1 Opening the Message Trace when Sending the Message

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
        producer.setNamesrvAddr("XX.XX.XX.XX1");
        producer.start();
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

        } catch (Exception e) {
            e.printStackTrace();
        }

4.2 Opening Message Trace whenSubscribing to a Message

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");

4.3 Support for Custom Storage Message Trace Topic

The initialization of DefaultMQProducer and DefaultMQPushConsumer instances can be changed to support the custom storage message trace Topic as follows when sending and subscriving messages above.

        ##Where Topic_test11111 needs to be pre-created by the user to save the message trace;
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
        ......

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
    ......


Two functions below are provided in the basic sample:

  • The RocketMQ can be utilized to send messages in three ways: reliable synchronous, reliable asynchronous, and one-way transmission. The first two message types are reliable because there is a response whether they were sent successfully.
  • The RocketMQ can be utilized to consume messages.

1 Add Dependency

maven:


  org.apache.rocketmq
  rocketmq-client
  4.3.0

gradle:

compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

2 Send Messages

2.1 Use Producer to Send Synchronous Messages

Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS notification.

public class SyncProducer {
  public static void main(String[] args) throws Exception {
    // Instantiate with a producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses
    producer.setNamesrvAddr("localhost:9876");
    // Launch the producer instance
    producer.start();
    for (int i = 0; i < 100; i++) {
      // Create a message instance with specifying topic, tag and message body
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
      // Send message to one of brokers
      SendResult sendResult = producer.send(msg);
      // Check whether the message has been delivered by the callback of sendResult
      System.out.printf("%s%n", sendResult);
    }
    // Shut down once the producer instance is not longer in use
    producer.shutdown();
  }
}
2.2 Send Asynchronous Messages

Asynchronous transmission is generally used in response time sensitive business scenarios. It means that it is unable for the sender to wait the response of the Broker too long.

public class AsyncProducer {
  public static void main(String[] args) throws Exception {
    // Instantiate with a producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses
    producer.setNamesrvAddr("localhost:9876");
    // Launch the producer instance
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);
    for (int i = 0; i < 100; i++) {
      final int index = i;
      // Create a message instance with specifying topic, tag and message body
      Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
      // SendCallback: receive the callback of the asynchronous return result.
      producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
          System.out.printf("%-10d OK %s %n", index,
            sendResult.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
          System.out.printf("%-10d Exception %s %n", index, e);
          e.printStackTrace();
        }
      });
    }
    // Shut down once the producer instance is not longer in use
    producer.shutdown();
  }
}
2.3 Send Messages in One-way Mode

One-way transmission is used for cases requiring moderate reliability, such as log collection.

public class OnewayProducer {
  public static void main(String[] args) throws Exception{
    // Instantiate with a producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses
    producer.setNamesrvAddr("localhost:9876");
    // Launch the producer instance
    producer.start();
    for (int i = 0; i < 100; i++) {
      // Create a message instance with specifying topic, tag and message body
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
      );
      // Send in one-way mode, no return result
      producer.sendOneway(msg);
    }
    // Shut down once the producer instance is not longer in use
     producer.shutdown();
  }
}

3 Consume Messages

public class Consumer {
  public static void main(String[] args) throws InterruptedException, MQClientException {
    // Instantiate with specified consumer group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    
    // Specify name server addresses
    consumer.setNamesrvAddr("localhost:9876");

// Subscribe one or more topics and tags for finding those messages need to be consumed
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    // Mark the message that have been consumed successfully
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
// Launch the consumer instance
consumer.start();
System.out.printf("Consumer Started.%n");

}
}

RocketMQ provides ordered messages using FIFO order. All related messages need to be sent into the same message queue in an orderly manner.

The following demonstrates ordered messages by ensuring order of create, pay, send and finish steps of sales order process.

1 produce ordered messages

package org.apache.rocketmq.example.order2

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/*

  • ordered messages producer
    */
    public class Producer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        String[] tags = new String[]{"TagA", "TagC", "TagD"};
        // sales orders list
        List<OrderStep> orderList = new Producer().buildOrders();
    
        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
    
        for (int i = 0; i < 10; i++) {
            // generate message timestamp
            String body = dateStr + " Hello RocketMQ " + orderList.get(i);
            Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
    
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //message queue is selected by #salesOrderID
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());
    
            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }
    
        producer.shutdown();
    }
    
    /**
     * each sales order step
     */
    private static class OrderStep {
        private long orderId;
        private String desc;
    
        public long getOrderId() {
            return orderId;
        }
    
        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }
    
        public String getDesc() {
            return desc;
        }
    
        public void setDesc(String desc) {
            this.desc = desc;
        }
    
        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }
    
    /**
     * to generate ten OrderStep objects for three sales orders:
     *  #SalesOrder "15103111039L": create, pay, send, finish;
     *  #SalesOrder "15103111065L": create, pay, finish;
     *  #SalesOrder "15103117235L": create, pay, finish;
     */
    private List<OrderStep> buildOrders() {
    
        List<OrderStep> orderList = new ArrayList<OrderStep>();
    
        //create sales order with orderid="15103111039L"
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("create");
        orderList.add(orderDemo);
    
        //create sales order with orderid="15103111065L"
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("create");
        orderList.add(orderDemo);
    
        //pay sales order #"15103111039L"
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("pay");
        orderList.add(orderDemo);
    
        //create sales order with orderid="15103117235L"
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("create");
        orderList.add(orderDemo);
    
        //pay sales order #"15103111065L"
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("pay");
        orderList.add(orderDemo);
    
        //pay sales order #"15103117235L"
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("pay");
        orderList.add(orderDemo);
    
        //mark sales order #"15103111065L" as "finish"
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("finish");
        orderList.add(orderDemo);
    
        //mark mark sales order #"15103111039L" as "send"
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("send");
        orderList.add(orderDemo);
    
        ////mark sales order #"15103117235L" as "finish"
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("finish");
        orderList.add(orderDemo);
    
        //mark sales order #"15103111039L" as "finish"
        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("finish");
        orderList.add(orderDemo);
    
        return orderList;
    }
    

    }

2 Consume ordered messages


package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**

  • consume messages in order
    */
    public class ConsumerInOrder {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /**
         * when the consumer is first run, the start point of message queue where it can get messages will be set.
         * or if it is restarted, it will continue from the last place to get messages.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
    
        consumer.registerMessageListener(new MessageListenerOrderly() {
    
            Random random = new Random();
    
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // one consumer for each message queue, and messages order are kept in a single message queue.
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }
    
                try {
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
    
        consumer.start();
    
        System.out.println("Consumer Started.");
    }
    

    }