0%


In most cases, tag is a simple and useful design to select messages you want. For example:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

The consumer will recieve messages that contains TAGA or TAGB or TAGC. But the limitation is that one message only can have one tag, and this may not work for sophisticated scenarios. In this case, you can use SQL expression to filter out messages.

SQL feature could do some calculation through the properties you put in when sending messages. Under the grammars defined by RocketMQ, you can implement some interesting logic. Here is an example:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

1 Grammars

RocketMQ only defines some basic grammars to support this feature. You could also extend it easily.

  • Numeric comparison, like >, >=, <, <=, BETWEEN, =;
  • Character comparison, like =, <>, IN;
  • IS NULL or IS NOT NULL;
  • Logical AND, OR, NOT;

Constant types are:

  • Numeric, like 123, 3.1415;
  • Character, like ‘abc’, must be made with single quotes;
  • NULL, special constant;
  • Boolean, TRUE or FALSE;

2 Usage constraints

Only push consumer could select messages by SQL92. The interface is:

public void subscribe(finalString topic, final MessageSelector messageSelector)

3 Producer example

You can put properties in message through method putUserProperty when sending.

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

4 Consumer example

Use MessageSelector.bySql to select messages through SQL when consuming.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

The following questions are frequently asked with regard to the RocketMQ project in general.

1 General

  1. Why did we create rocketmq project instead of selecting other products?

    Please refer to Why RocketMQ

  2. Do I have to install other softeware, such as zookeeper, to use RocketMQ?

    No. RocketMQ can run independently.

2 Usage

1. Where does the newly created Consumer ID start consuming messages?

 1) If the topic sends a message within three days, then the consumer start consuming messages from the first message saved in the server.

 2) If the topic sends a message three days ago, the consumer starts to consume messages from the latest message in the server, in other words, starting from the tail of message queue.

 3) If such consumer is rebooted, then it starts to consume messages from the last consumption location.

2. How to reconsume message when consumption fails?

 1) Cluster consumption pattern, The consumer business logic code returns Action.ReconsumerLater, NULL, or throws an exception, if a message failed to be consumed, it will retry for up to 16 times, after that, the message would be descarded.

 2) Broadcast consumption patternThe broadcaset consumption still ensures that a message is consumered at least once, but no resend option is provided.

3. How to query the failed message if there is a consumption failure?

 1) Using topic query by time, you can query messages within a period of time.

 2) Using Topic and Message Id to accurately query the message.

 3) Using Topic and Message Key accurately query a class of messages with the same Message Key.

4. Are messages delivered exactly once?

RocketMQ ensures that all messages are delivered at least once. In most cases, the messages are not repeated.

5. How to add a new broker?

 1) Start up a new broker and register it to the same list of name servers.

 2) By default, only internal system topics and consumer groups are created automatically. If you would like to have your business topic and consumer groups on the new node, please replicate them from the existing broker. Admin tool and command lines are provided to handle this.

3 Configuration related

The following answers are all default values and can be modified by configuration.

1. How long are the messages saved on the server?

Stored messages will be saved for up to 3 days, and messages that are not consumed for more than 3 days will be deleted.

2. What is the size limit for message Body?

Generally 256KB.

3. How to set the number of consumer threads?

When you start Consumer, set a ConsumeThreadNums property, example is as follows:

consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);

4 Errors

1. If you start a producer or consumer failed and the error message is producer group or consumer repeat.

Reason:Using the same Producer /Consumer Group to launch multiple instances of Producer/Consumer in the same JVM may cause the client fail to start.

Solution: Make sure that a JVM corresponding to one Producer /Consumer Group starts only with one Producer/Consumer instance.

2. Consumer failed to start loading json file in broadcast mode.

Reason: Fastjson version is too low to allow the broadcast consumer to load local offsets.json, causing the consumer boot failure. Damaged fastjson file can also cause the same problem.

Solution: Fastjson version has to be upgraded to rocketmq client dependent version to ensure that the local offsets.json can be loaded. By default offsets.json file is in /home/{user}/.rocketmq_offsets. Or check the integrity of fastjson.

3. What is the impact of a broker crash.

 1) Master crashes

Messages can no longer be sent to this broker set, but if you have another broker set available, messages can still be sent given the topic is present. Messages can still be consumed from slaves.

 2) Some slave crash

As long as there is another working slave, there will be no impact on sending messages. There will also be no impact on consuming messages except when the consumer group is set to consume from this slave preferably. By default, comsumer group consumes from master.

 3) All slaves crash

There will be no impact on sending messages to master, but, if the master is SYNC_MASTER, producer will get a SLAVE_NOT_AVAILABLE indicating that the message is not sent to any slaves. There will also be no impact on consuming messages except that if the consumer group is set to consume from slave preferably. By default, comsumer group consumes from master.

4. Producer complains “No Topic Route Info”, how to diagnose?

This happens when you are trying to send messages to a topic whose routing info is not available to the producer.

 1) Make sure that the producer can connect to a name server and is capable of fetching routing meta info from it.

 2) Make sure that name servers do contain routing meta info of the topic. You may query the routing meta info from name server through topicRoute using admin tools or web console.

 3) Make sure that your brokers are sending heartbeats to the same list of name servers your producer is connecting to.

 4) Make sure that the topic’s permssion is 6(rw-), or at least 2(-w-).

If you can’t find this topic, create it on a broker via admin tools command updateTopic or web console.

OpenMessaging, which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, ecommerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems.

RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging.

OMSProducer

The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions.

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

    final Producer producer = messagingAccessPoint.createProducer();

    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");

    producer.startup();
    System.out.printf("Producer startup OK%n");

    {
        Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
        SendResult sendResult = producer.send(message);
        System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
    }

    {
        final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        result.addListener(new PromiseListener<SendResult>() {
            @Override
            public void operationCompleted(Promise<SendResult> promise) {
                System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
            }

            @Override
            public void operationFailed(Promise<SendResult> promise) {
                System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
            }
        });
    }

    {
        producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        System.out.printf("Send oneway message OK%n");
    }

    producer.shutdown();
    messagingAccessPoint.shutdown();
}

}

OMSPullConsumer

Use OMS PullConsumer to poll messages from a specified queue.

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

    final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
        OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");

    consumer.startup();
    System.out.printf("Consumer startup OK%n");

    Message message = consumer.poll();
    if (message != null) {
        String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
        System.out.printf("Received one message: %s%n", msgId);
        consumer.ack(msgId);
    }

    consumer.shutdown();
    messagingAccessPoint.shutdown();
}

}

OMSPushConsumer

Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener

public class OMSPushConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

    final PushConsumer consumer = messagingAccessPoint.
        createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            consumer.shutdown();
            messagingAccessPoint.shutdown();
        }
    }));

    consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
        @Override
        public void onMessage(final Message message, final ReceivedMessageContext context) {
            System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
            context.ack();
        }
    });

}

}

1 Transaction message status

There are three states for transaction message:

  • TransactionStatus.CommitTransaction: commit transaction, it means that allow consumers to consume this message.
  • TransactionStatus.RollbackTransaction: rollback transaction, it means that the message will be deleted and not allowed to consume.
  • TransactionStatus.Unknown: intermediate state, it means that MQ is needed to check back to determine the status.

2 Send transactional message example

2.1 Create the transactional producer

Use TransactionMQProducerclass to create producer client, and specify a unique ProducerGroup, and you can set up a custom thread pool to process check requests. After executing the local transaction, you need to reply to MQ according to the execution result, and the reply status is described in the above section.

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
   public static void main(String[] args) throws MQClientException, InterruptedException {
       TransactionListener transactionListener = new TransactionListenerImpl();
       TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
       ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
           @Override
           public Thread newThread(Runnable r) {
               Thread thread = new Thread(r);
               thread.setName("client-transaction-msg-check-thread");
               return thread;
           }
       });
       producer.setExecutorService(executorService);
       producer.setTransactionListener(transactionListener);
       producer.start();
       String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
       for (int i = 0; i < 10; i++) {
           try {
               Message msg =
                   new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                       ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
               SendResult sendResult = producer.sendMessageInTransaction(msg, null);
               System.out.printf("%s%n", sendResult);
               Thread.sleep(10);
           } catch (MQClientException | UnsupportedEncodingException e) {
               e.printStackTrace();
           }
       }
       for (int i = 0; i < 100000; i++) {
           Thread.sleep(1000);
       }
       producer.shutdown();
   }
}

2.2 Implement the TransactionListener interface

The executeLocalTransaction method is used to execute local transaction when send half message succeed. It returns one of three transaction status mentioned in the previous section.

The checkLocalTransaction method is used to check the local transaction status and respond to MQ check requests. It also returns one of three transaction status mentioned in the previous section.

public class TransactionListenerImpl implements TransactionListener {
  private AtomicInteger transactionIndex = new AtomicInteger(0);
  private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();
  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
      int value = transactionIndex.getAndIncrement();
      int status = value % 3;
      localTrans.put(msg.getTransactionId(), status);
      return LocalTransactionState.UNKNOW;
  }
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      Integer status = localTrans.get(msg.getTransactionId());
      if (null != status) {
          switch (status) {
              case 0:
                  return LocalTransactionState.UNKNOW;
              case 1:
                  return LocalTransactionState.COMMIT_MESSAGE;
              case 2:
                  return LocalTransactionState.ROLLBACK_MESSAGE;
          }
      }
      return LocalTransactionState.COMMIT_MESSAGE;
  }
}

3 Usage Constraint

  1. Messages of the transactional have no schedule and batch support.
  2. In order to avoid a single message being checked too many times and lead to half queue message accumulation, we limited the number of checks for a single message to 15 times by default, but users can change this limit by change the transactionCheckMax parameter in the configuration of the broker, if one message has been checked over transactionCheckMax times, broker will discard this message and print an error log at the same time by default. Users can change this behavior by override the AbstractTransactionalMessageCheckListener class.
  3. A transactional message will be checked after a certain period of time that determined by parameter gtransactionTimeout in the configuration of the broker. And users also can change this limit by set user property CHECK_IMMUNITY_TIME_IN_SECONDS when sending transactional message, this parameter takes precedence over the transactionTimeout parameter.
  4. A transactional message maybe checked or consumed more than once.
  5. Committed message reput to the user’s target topic may fail. Currently, it depends on the log record. High availability is ensured by the high availability mechanism of RocketMQ itself. If you want to ensure that the transactional message isn’t lost and the transaction integrity is guaranteed, it is recommended to use synchronous double write. mechanism.
  6. Producer IDs of transactional messages cannot be shared with producer IDs of other types of messages. Unlike other types of message, transactional messages allow backward queries. MQ Server query clients by their Producer IDs.


Sending messages in batch improves performance of delivering small messages. Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support. You can send messages up to 4MiB at a time, but if you need to send a larger message, it is recommended to divide the larger messages into multiple small messages of no more than 1MiB.

1 Send Batch Messages

If you just send messages of no more than 4MiB at a time, it is easy to use batch:

String topic = "BatchTest";
List messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}

2 Split into Lists

The complexity only grow when you send large batch and you may not sure if it exceeds the size limit (4MiB). At this time, you’d better split the lists:

public class ListSplitter implements Iterator> { 
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List messages;
    private int currIndex;
    public ListSplitter(List messages) { 
        this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size(); 
    }
    @Override public List next() { 
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex); 
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break; 
            } else {
                totalSize += tmpSize; 
            }
        }
        List subList = messages.subList(startIndex, nextIndex); 
        currIndex = nextIndex;
        return subList;
    }
    private int getStartIndex() {
        Message currMessage = messages.get(currIndex); 
        int tmpSize = calcMessageSize(currMessage); 
        while(tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(curIndex); 
            tmpSize = calcMessageSize(message);
        }
        return currIndex; 
    }
    private int calcMessageSize(Message message) {
        int tmpSize = message.getTopic().length() + message.getBody().length(); 
        Map properties = message.getProperties();
        for (Map.Entry entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length(); 
        }
        tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
        return tmpSize; 
    }
}

// then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
// handle the error
}
}

1 Start consumer to wait for incoming subscribed messages

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class ScheduledMessageConsumer {

public static void main(String[] args) throws Exception {
    // Instantiate message consumer
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
    // Subscribe topics
    consumer.subscribe("TestTopic", "*");
    // Register message listener
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
            for (MessageExt message : messages) {
                // Print approximate delay time period
                System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                                   + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // Launch consumer
    consumer.start();
}

}

2 Send scheduled messages

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {

public static void main(String[] args) throws Exception {
    // Instantiate a producer to send scheduled messages
    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    // Launch producer
    producer.start();
    int totalMessagesToSend = 100;
    for (int i = 0; i < totalMessagesToSend; i++) {
        Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
        // This message will be delivered to consumer 10 seconds later.
        message.setDelayTimeLevel(3);
        // Send the message
        producer.send(message);
    }

    // Shutdown producer after use.
    producer.shutdown();
}

}

3 Verification

You should see messages are consumed about 10 seconds later than their storing time.

4 Use scenarios for scheduled messages

For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released.

5 Restrictions on the use of scheduled messages

// org/apache/rocketmq/store/config/MessageStoreConfig.java

private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

Nowadays RocketMq does not support any time delay. It needs to set several fixed delay levels, which correspond to level 1 to 18 from 1s to 2h. Message consumption failure will enter the delay message queue. Message sending time is related to the set delay level and the number of retries.

See SendMessageProcessor.java

1 Transaction Message

Apache RocketMQ supports distributed transaction message from version 4.3.0. RocketMQ implements transaction message by using the protocol of 2PC(two-phase commit), in addition adding a compensation logic to handle timeout-case or failure-case of commit-phase, as shown below.

1.1 The Process of RocketMQ Transaction Message

The picture above shows the overall architecture of transaction message, including the sending of message(commit-request phase), the sending of commit/rollback(commit phase) and the compensation process.

1 The sending of message and Commit/Rollback.

(1) Sending the message(named Half message in RocketMQ)

(2) The server responds the writing result(success or failure) of Half message.

(3) Handle local transaction according to the result(local transaction won’t be executed when the result is failure).

(4) Sending Commit/Rollback to broker according to the result of local transaction(Commit will generate message index and make the message visible to consumers).

2 Compensation process

(1) For a transaction message without a Commit/Rollback (means the message in the pending status), a “back-check” request is initiated from the broker.

(2) The Producer receives the “back-check” request and checks the status of the local transaction corresponding to the “back-check” message.

(3) Redo Commit or Rollback based on local transaction status.

The compensation phase is used to resolve the timeout or failure case of the message Commit or Rollback.

1.2 The design of RocketMQ Transaction Message

1 Transaction message is invisible to users in first phase(commit-request phase)

Upon on the main process of transaction message, the message of first phase is invisible to the user. This is also the biggest difference from normal message. So how do we write the message while making it invisible to the user? And below is the solution of RocketMQ: if the message is a Half message, the topic and queueId of the original message will be backed up, and then changes the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group does not subscribe to the topic, the consumer cannot consume the Half message. Then RocketMQ starts a timing task, pulls the message for RMQ_SYS_TRANS_HALF_TOPIC, obtains a channel according to producer group and sends a back-check to query local transaction status, and decide whether to submit or roll back the message according to the status.

In RocketMQ, the storage structure of the message in the broker is as follows. Each message has corresponding index information. The Consumer reads the content of the message through the secondary index of the ConsumeQueue. The flow is as follows:

The specific implementation strategy of RocketMQ is: if the transaction message is written, topic and queueId of the message are replaced, and the original topic and queueId are stored in the properties of the message. Because the replace of the topic, the message will not be forwarded to the Consumer Queue of the original topic, and the consumer cannot perceive the existence of the message and will not consume it. In fact, changing the topic is the conventional method of RocketMQ(just recall the implementation mechanism of the delay message).

2 Commit/Rollback operation and introduction of Op message

After finishing writing a message that is invisible to the user in the first phase, here comes two cases in the second phase. One is Commit operation, after which the message needs to be visible to the user; the other one is Rollback operation, after which the first phase message(Half message) needs to be revoked. For the case of Rollback, since first-phase message itself is invisible to the user, there is no need to actually revoke the message (in fact, RocketMQ can't actually delete a message because it is a sequential-write file). But still some operation needs to be done to identity the final status of the message, to differ it from pending status message. To do this, the concept of "Op message" is introduced, which means the message has a certain status(Commit or Rollback). If a transaction message does not have a corresponding Op message, the status of the transaction is still undetermined (probably the second-phase failed). By introducing the Op message, the RocketMQ records an Op message for every Half message regardless it is Commit or Rollback. The only difference between Commit and Rollback is that when it comes to Commit, the index of the Half message is created before the Op message is written.

3 How Op message stored and the correspondence between Op message and Half message

RocketMQ writes the Op message to a specific system topic(RMQ_SYS_TRANS_OP_HALF_TOPIC) which will be created via the method - TransactionalMessageUtil.buildOpTopic(); this topic is an internal Topic (like the topic of RMQ_SYS_TRANS_HALF_TOPIC) and will not be consumed by the user. The content of the Op message is the physical offset of the corresponding Half message. Through the Op message we can index to the Half message for subsequent check-back operation.

4 Index construction of Half messages

When performing Commit operation of the second phase, the index of the Half message needs to be built. Since the Half message is written to a special topic(RMQ_SYS_TRANS_HALF_TOPIC) in the first phase of 2PC, so it needs to be read out from the special topic when building index, and replace the topic and queueId with the real target topic and queueId, and then write through a normal message that is visible to the user. Therefore, in conclusion, the second phase recovers a complete normal message using the content of the Half message stored in the first phase, and then goes through the message-writing process.

5 How to handle the message failed in the second phase?

If commit/rollback phase fails, for example, a network problem causes the Commit to fail when you do Commit. Then certain strategy is required to make sure the message finally commit. RocketMQ uses a compensation mechanism called "back-check". The broker initiates a back-check request for the message in pending status, and sends the request to the corresponding producer side (the same producer group as the producer group who sent the Half message). The producer checks the status of local transaction and redo Commit or Rollback. The broker performs the back-check by comparing the RMQ_SYS_TRANS_HALF_TOPIC messages and the RMQ_SYS_TRANS_OP_HALF_TOPIC messages and advances the checkpoint(recording those transaction messages that the status are certain).

RocketMQ does not back-check the status of transaction messages endlessly. The default time is 15. If the transaction status is still unknown after 15 times, RocketMQ will roll back the message by default.

RocketMQ message queue cluster mainly includes four roles: NameServer, Broker (Master/Slave), Producer and Consumer. The basic communication process is as follows:

(1) After Broker start-up, it needs to complete one operation: register itself to NameServer, and then report Topic routing information to NameServer at regular intervals of 30 seconds.

(2) When message Producer sends a message as a client, it needs to obtain routing information from the local cache TopicPublishInfoTable according to the Topic of the message. If not, it will be retrieved from NameServer and update to local cache, at the same time, Producer will retrieve routing information from NameServer every 30 seconds by default.

(3) Message Producer chooses a queue to send the message according to the routing information obtained in 2); Broker receives the message and records it in disk as the receiver of the message.

(4) After message Consumer gets the routing information according to 2) and complete the load balancing of the client, then select one or several message queues to pull messages and consume them.

From 1) ~ 3) above, we can see that both Producer, Broker and NameServer communicate with each other(only part of MQ communication is mentioned here), so how to design a good network communication module is very important in MQ. It will determine the overall messaging capability and final performance of the RocketMQ cluster.

rocketmq-remoting module is the module responsible for network communication in RocketMQ message queue. It is relied on and referenced by almost all other modules (such as rocketmq-client,rocketmq-broker,rocketmq-namesrv) that need network communication. In order to realize the efficient data request and reception between the client and the server, the RocketMQ message queue defines the communication protocol and extends the communication module on the basis of Netty.

1 Remoting Communication Class Structure

2 Protocol Design and Code

When a message is sent between Client and Server, a protocol convention is needed for the message sent, so it is necessary to customize the message protocol of RocketMQ. At the same time, in order to efficiently transmit messages and read the received messages, it is necessary to encode and decode the messages. In RocketMQ, the RemotingCommand class encapsulates all data content in the process of message transmission, which includes not only all data structures, but also encoding and decoding operations.

Header fieldTypeRequest descResponse desc
codeintRequest code. answering business processing is different according to different requests codeResponse code. 0 means success, and non-zero means errors.
languageLanguageCodeLanguage implemented by the requesterLanguage implemented by the responder
versionintVersion of Request EquationVersion of Response Equation
opaqueintEquivalent to reqeustId, the different request identification codes on the same connection correspond to those in the response messageThe response returns directly without modification
flagintSign, used to distinguish between ordinary RPC or oneway RPCSign, used to distinguish between ordinary RPC or oneway RPC
remarkStringTransfer custom text informationTransfer custom text information
extFieldsHashMapRequest custom extension informationResponse custom extension information

From the above figure, the transport content can be divided into four parts:

(1) Message length: total length, four bytes of storage, occupying an int type;

(2) Serialization type header length: occupying an int type. The first byte represents the serialization type, and the last three bytes represent the header length;

(3) Header data: serialized header data;

(4) Message body data: binary byte data content of message body;

3 Message Communication Mode and Procedure

There are three main ways to support communication in RocketMQ message queue: synchronous (sync), asynchronous (async), one-way (oneway). The "one-way" communication mode is relatively simple and is generally used in sending heartbeat packets without paying attention to its Response. Here, mainly introduce the asynchronous communication flow of RocketMQ.

4 Reactor Multithread Design

The RPC communication of RocketMQ uses Netty component as the underlying communication library, and also follows the Reactor multithread model. At the same time, some extensions and optimizations are made on it.

Above block diagram can roughly understand the Reactor multi-thread model of NettyRemotingServer in RocketMQ. A Reactor main thread (eventLoopGroupBoss, is 1 above) is responsible for listening to TCP network connection requests, establishing connections, creating SocketChannel, and registering on selector. The source code of RocketMQ automatically selects NIO and Epoll according to the type of OS. Then listen to real network data. After you get the network data, you throw it to the Worker thread pool (eventLoopGroupSelector, is the “N” above, the default is 3 in the source code). You need to do SSL verification, codec, idle check, network connection management before you really execute the business logic. These tasks to defaultEventExecutorGroup (that is, “M1” above, the default set to 8 in the source code) to do. The processing business operations are executed in the business thread pool. According to the RomotingCommand business request code, the corresponding processor is found in the processorTable local cache variable and encapsulated into the task, and then submitted to the corresponding business processor processing thread pool for execution (sendMessageExecutor,). Take sending a message, for example, the “M2” above. The thread pool continues to increase in several steps from entry to business logic, which is related to the complexity of each step. The more complex the thread pool is, the wider the concurrent channel is required.

Number of threadName of threadDesc of thread
1NettyBoss_%dReactor Main thread
NNettyServerEPOLLSelector%d%dReactor thread pool
M1NettyServerCodecThread_%dWorker thread pool
M2RemotingExecutorThread_%dbussiness processor thread pool

RocketMQ supports message queries by two dimensions, which are “Query Message by Message Id” and “Query Message by Message Key”.

1. Query Message by Message Id

The MessageId in RocketMQ has a total length of 16 bytes, including the broker address (IP address and port) and CommitLog offset. In RocketMQ, the specific approach is that the Client resolves the Broker’s address (IP address and port) and the CommitLog’s offset address from the MessageId. Then both of them are encapsulated into an RPC request, and finally it will be sent through the communication layer (business request code: VIEW_MESSAGE_BY_ID). The Broker reads a message by using the CommitLog offset and size to find the real message in the CommitLog and then return, which is how QueryMessageProcessor works.

2. Query Message by Message Id

“Query Messages by Message Key” is mainly based on RocketMQ’s IndexFile. The logical structure of the IndexFile is similar to the implementation of HashMap in JDK. The specific structure of the IndexFile is as follows:

The IndexFile provides the user with the querying service by “Querying Messages by Message Key”. The IndexFile is stored in {fileName}, and the file name is named after the timestamp at the time of creation. The file size is fixed, which is 420,000,040 bytes (40+5million4+20million20). If the UNIQ_KEY is set in the properties of the message, then the “topic + ‘#’ + UNIQ_KEY” will be used as the index. Likewise, if the KEYS is set in the properties of the message (multiple KEYs should be separated by spaces), then the “topic + ‘#’ + KEY” will be used as the index.
The index data contains four fields, Key Hash, CommitLog offset, Timestamp and NextIndex offset, for a total of 20 Bytes. The NextIndex offset of the index data will point to the previous index data if the Key Hash of the index data is the same as that of the previous index data. If a hash conflict occurs, then the NextIndex offset can be used as the field to string all conflicting indexes in a linked list. What the Timestamp records is the time difference between two storeTimestamps, instead of a specific time. The structure of the entire IndexFile is shown in the graph. The Header is used to store some general statistics, which needs 40 bytes. The Slot Table of 45million bytes does not save the real index data, but saves the header of the singly linked list corresponding to each slot. The Index Linked List of 2020million is the real index data, that is, an Index File can hold 20million indexes.
The specific method of “Query Message by Message Key” is that the topic and message key are used to find the record in the IndexFile, and then read the message from the file of CommitLog according to the CommitLog offset in this record.

Message storage is the most complicated and important part of RocketMQ. This section will describe the three aspects of RocketMQ:

  • Message storage architecture
  • PageCache and memory mapping
  • RocketMQ's two different disk flushing methods.

1 Message Storage Architecture

The message storage architecture diagram consists of 3 files related to message storage: CommitLog file, ConsumeQueue file, and IndexFile.

  • CommitLog:The CommitLog file stores message body and metadata sent by producer, and the message content is not fixed length. The default size of one CommitLog file is 1G, the length of the file name is 20 digits, the left side is zero padded, and the remaining is the starting offset. For example, 00000000000000000000 represents the first file, the starting offset is 0, and the file size is 1G=1073741824, when the first CommitLog file is full, the second CommitLog file is 00000000001073741824, the starting offset is 1073741824, and so on. The message is mainly appended to the log file sequentially. When one CommitLog file is full, the next will be written.
  • ConsumeQueue: The ConsumeQueue is used to improve the performance of message consumption. Since RocketMQ uses topic-based subscription mode, message consumption is specific to the topic. Traversing the commitlog file to retrieve messages of one topic is very inefficient. The consumer can find the messages to be consumed according to the ConsumeQueue. The ConsumeQueue(logic consume queue) as an index of the consuming message stores the starting physical offset offset in CommitLog of the specified topic, the message size size and the hash code of the message tag. The ConsumeQueue file can be regarded as a topic-based CommitLog index file, so the consumequeue folder is organized as follows: topic/queue/file three-layer organization structure, the specific storage path is $HOME/store/consumequeue/{topic}/{queueId }/{fileName}. The consumequeue file uses a fixed-length design, each entry occupies 20 bytes, which is an 8-byte commitlog physical offset, a 4-byte message length, and an 8-byte tag hashcode. One consumequeue file consists of 0.3 million entries, each entry can be randomly accessed like an array, each ConsumeQueue file's size is about 5.72MB.
  • IndexFile: The IndexFile provides a way to query messages by key or time interval. The path of the IndexFile is $HOME/store/index/${fileName}, the file name fileName is named after the timestamp when it was created. One IndexFile's size is about 400M, and it can store 2000W indexes. The underlying storage of IndexFile is designed to implement the HashMap structure in the file system, so RocketMQ's index file is a hash index.

From the above architecture of the RocketMQ message storage, we can see RocketMQ uses a hybrid storage structure, that is, all the queues in an instance of the broker share a single log file CommitLog to store messages. RocketMQ's hybrid storage structure(messages of multiple topics are stored in one CommitLog) uses a separate storage structure for the data and index parts for Producer and Consumer respectively. The Producer sends the message to the Broker, then the Broker persists the message to the CommitLog file synchronously or asynchronously. As long as the message is persisted to the CommitLog on the disk, the message sent by the Producer will not be lost. Because of this, Consumer will definitely have the opportunity to consume this message. When no message can be pulled, the consumer can wait for the next pull. And the server also supports the long polling mode: if a pull request pulls no messages, the Broker can wait for 30 seconds, as long as new message arrives in this interval, it will be returned directly to the consumer. Here, RocketMQ's specific approach is using Broker's background service thread ReputMessageService to continuously dispatch requests and asynchronously build ConsumeQueue (Logical Queue) and IndexFile data.

2 PageCache and Memory Map

PageCache is a cache of files by the operating system to speed up the reading and writing of files. In general, the speed of sequential read and write files is almost the same as the speed of read and write memory. The main reason is that the OS uses a portion of the memory as PageCache to optimize the performance of the read and write operations. For data writing, the OS will first write to the Cache, and then the pdflush kernel thread asynchronously flush the data in the Cache to the physical disk. For data reading, if it can not hit the page cache when reading a file at a time, the OS will read the file from the physical disk and prefetch the data files of other neighboring blocks sequentially.

In RocketMQ, the logic consumption queue ConsumeQueue stores less data and is read sequentially. With the help of prefetch of the page cache mechanism, the read performance of the ConsumeQueue file is almost close to the memory read, even in the case of message accumulation, it does not affect performance. But for the log data file CommitLog, it will generate many random access reads when reading the message content, which seriously affects the performance. If you choose the appropriate IO scheduling algorithm, such as setting the IO scheduling algorithm to "Deadline" (when the block storage uses SSD), the performance of random reads will also be improved.

In addition, RocketMQ mainly reads and writes files through MappedByteBuffer. MappedByteBuffer uses the FileChannel model in NIO to directly map the physical files on the disk to the memory address in user space (Mmap method reduces the performance overhead of traditional IO copying disk file data back and forth between the buffer in kernel space and the buffer in user space), it converts the file operation into direct memory address manipulation, which greatly improves the efficiency of reading and writing files (Because of the need to use the memory mapping mechanism, RocketMQ's file storage is fixed-length, making it easy to map the entire file to memory at a time).

3 Message Disk Flush

  • synchronous flush: As shown above, the RocketMQ's Broker will return a successful ACK response to the Producer after the message is truly persisted to disk. Synchronous flushing is a good guarantee for the reliability of MQ messages, but it will have a big impact on performance. Generally, it is suitable for financial business applications.
  • asynchronous flush: Asynchronous flushing can take full advantage of the PageCache of the OS, as long as the message is written to the PageCache, the successful ACK can be returned to the Producer. The message flushing is performed by the background asynchronous thread, which reduces the read and write delay and improves the performance and throughput of the MQ.