rocketmq消息可靠性
消息重试
指的是消费者消费消息失败后,mq需要让消费者重新消费一次。
只有消费模式处于集群模式下时,才会有重试机制,广播模式下是没有重试机制的。
消费者消费消息失败通常有以下两种情况:
- 消息的原因,反序列化失败,消息本身数据无法处理等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
- 消费者依赖的下游服务不可用,遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。
考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。
RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
实际上,以下情况会触发消息重试:
- 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER。
- 业务消费方返回null。
- 业务消费方抛出异常。
对于抛出异常的情况,只要我们在业务逻辑中显式抛出异常或者非显式抛出异常,broker也会重新投递消息,如果业务对异常做了捕获,那么该消息将不会发起重试。因此对于需要重试的业务,消费方在捕获异常时要注意返回ConsumeConcurrentlyStatus.RECONSUME_LATER或null,输出日志并打印当前重试次数。推荐返回ConsumeConcurrentlyStatus.RECONSUME_LATER。
RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,也可以在代码里控制重试次数。
重试逻辑
rocketmq使用时间衰减策略,进行重试。
时间间隔可以为:
1 | messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h |
重试逻辑:
- 首先判断消费端有没有显式设置最大重试次数,如果没有就默认16次。
- 当消息发送失败,mq会发起消费重试。
- 判断消息当前重试次数是否等于大于最大重试次数,如果达到,或者配置的次数小于0,获取死信队列,将超时的消息投递至死信队列中。
- 正常需要重试的消息,将采用延时消息的模式进行重试。将新的延时消息(就是原来的需要重试的消息)重新刷盘。采用定时任务进行投递。
对于重试的消息,mq并不会从原队列获取消息,而是创建了一个新的topic进行保存。
对于所有消费者消费失败的消息,rocketMQ都会把重试的消息 重新new出来(即上文提到的MessageExtBrokerInner对象),然后投递到主题SCHEDULE_TOPIC_XXXX下的队列中,然后由定时任务进行调度重试,而重试的周期符合我们在上文中提到的delayLevel周期。
1 | 重试代码还没有搞上来 |
消息重投(生产者发送失败,未收到mq的ACK)
生产者在发送消息时,同步消息失败则会重投,异步消息失败有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。
消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。
如下方法可以设置消息重试策略:
- retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
- retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
- retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
死信队列
消息重试超过一定次数的消息,将会进入死信队列。
死信队列逻辑:
- 首先判断消息当前重试次数是否大于等于16(默认是16),或者消息延迟级别是否小于0。
- 只要满足上述的任意一个条件,设置新的topic(死信topic)为:%DLQ%+consumerGroup。
- 进行前置属性的添加。
备份原先topic和队列id等。
- 将死信消息投递到上述步骤2建立的死信topic对应的死信队列中并落盘,使消息持久化。
死信队列中的消息需要人工介入处理,在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
流量控制
生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
生产者流控
生产者流控模式下,不会进行消息重投。
- commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。操作系统页缓存繁忙。
- 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
- broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
- broker通过拒绝send 请求方式实现流量控制。
消费者流控
消费者流控的结果是降低拉取频率。
- 消费者本地缓存消息数量pullThresholdForQueue时,默认1000。
- 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
- 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
消息堆积
个人理解,产生流控时,必定伴随着消息堆积。mq中将消息刷盘,记录消费进度,天然支持消息堆积。
消息持久化
mq支持消息的高可靠,影响可靠性的几种情况:
- Broker非正常关闭
- Broker异常Crash
- OS Crash
- 机器掉电,但是能立即恢复供电情况
- 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
- 磁盘设备损坏
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。
消息刷盘
消息存储是RocketMQ中最为复杂和最为重要的一部分。
MessageQuene与ConsumeQuene
个人理解,MessageQuene是逻辑上的东西,是将Topic进一步划分之后的字主题,也是消息存储队列。
ConsumeQuene又称为消费逻辑队列,是在文件系统中存在实际的文件的,内存存储者Commitlog中消息的位置(commit offset,消息长度,tag的hashcode值)等相关信息,可以看作是消息的索引文件。
根据ConsumeQuene文件的组织方式,可以看出,相同Topic下相同MessageQuene下的ConsumeQuene文件在相同的位置(相同的文件夹内)。
CommitLog文件存储着所有的消息,名称从0开始,文件名称代表了文件中第一个消息的偏移量,文件名称长度20位,每个大小最大1G。
ConsumeQuene文件,按照topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。这里的queneId就是MessageQuene的id,同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。可以看作是CommitLog文件按照位置,quene的索引文件。
IndexFile,索引文件,提供了一种按照key或者时间区间查询消息的索引。index文件的存储位置是:$HOME \store\index${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
生产者发送消息的过程,会将消息发送到指定的Topic,然后根据负载均衡选择该Topic下的一个MessageQuene,实际上发送消息时,会将消息保存进CommitLog文件,同时会在对应Quene的ConsumeQuene文件中写入索引,也会在Index文件中写入时间索引。
消费者消费时,会根据负载均衡到的MessageQuene定位到具体的文件夹,在根据消息消费进度,确定具体的ConsumeQuene文件,从而在CommitLog文件中查出具体的消息。