rocketmq消息分类
消息
message就是要传输的信息,一条消息必须有一个主题(Topic)。
从特点区分
mq中的消息从特点区分可以分为三类:同步消息,异步消息,单向消息。
同步消息
同步发送是指消息发送方发出数据后,会阻塞直到MQ服务方发回响应消息。
异步消息
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。
MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收服务器响应,并对服务器的响应结果进行处理。
单向消息
单向(Oneway)发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。这种消息常用于日志收集。
从共功能分
从使用功能也可以分为多种消息,最广为人知的就是事务消息。
消息共有以下分类:
- 普通消息
- 顺序消息
- 广播消息
- 延时消息
- 批量消息
- 事务消息
顺序消息
消息有序指的是一类消息消费时,能按照发送的顺序来消费。
广播消息
广播消息与普通消息不同的是,普通消息只会由消费者组中负载均衡到消息所在MessageQuene的消费者消费,而广播消息由消费者组中的所有消费者消费。
定时消息
定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,可以配置不同的DelayLevel延迟不同的时间。
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
事务消息
官方的事务消息解释
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似X/Open XA的分布事务功能,通过事务消息能达到分布式事务的最终一致。
事务消息的大概流程:
- 发送一个事务的半消息。
- 服务端将消息持久化成功之后,向发送方ACK确认消息已经发送成功,此时消息为半消息。
- 开始执行本地事务,如果发送消息失败,本地事务不执行。
- 根据本地事务向mq进行二次提交,Commit或rollback,commit会生成消息索引,mq收到Commit状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半消息,订阅方将不会接受该消息。
补偿流程:补偿流程用于mq没有收到发送者二次提交的情况下:
- 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”。(最大回查15次)
- Producer收到回查消息,检查回查消息对应的本地事务的状态。
- 根据本地事务状态,重新Commit或者Rollback。
注意:
- 需要消息回查接口。
- 不能保证消息幂等,需要消费方手动控制,在消费者未ack情况下,可能重复消费。
消息问题
RocketMQ之二:分布式开放消息系统RocketMQ的原理与实践(消息的顺序问题、重复问题、可靠消息/事务消息)
消息顺序问题
有着一定顺序的消息,一般需要发送到同一个MessageQuene,因为MessageQuene采用先进先出队列,消费方就可以按照发送的顺序进行消费。
可以自定义MessageQueueSelector实现的算法来选择一个队列。
mq顺序消息中,mq需要收到消费方对上一个消息的确认ACK后才能投递下一个消息,这个会带来上一个消息ack收不到的情况下,会造成重复消费。
消息重复问题
mq中并没有处理消息的重复问题,需要消费方自己实现幂等,解决消息重复消费的问题。