由于网络抖动、服务宕机等一些不确定的因素,
RocketMQ
在发送消息的时候很有可能出现消息发送或者消费失败的问题。
所以RocketMQ
消息重试分为2种:
Producer
端重试和Consumer
端重试。
Producer端重试
生产者端的消息失败,也就是
Producer
往MQ
上发消息没有发送成功。
- 比如网络抖动致使生产者发送消息到
MQ
失败。这种消息失败重试可以手动设置发送失败重试的次数。
producer.setRetryTimesWhenSendFailed(3);
官方说明
Producer
的send
方法本身支持内部重试。重试逻辑:
- 默认至多重试2次。
- 这个方法的总耗时时间不超过
sendMsgTimeout
设置的值,默认10s。如果本身向
Broker
发送消息产生超时异常,就不会再重试。
- 以上策略也是在一定程度上保证了消息可以发送成功。
如果业务对消息可靠性要求比较高,建议增加相应的重试逻辑:
- 比如调用
send
同步方法发送失败时,则尝试将消息存储到DB
。- 然后由后台线程定时重试,确保消息一定到达
Broker
。
重试策略
消息发送重试有三种策略:
同步发送失败策略、异步发送失败策略和消息刷盘失败策略。
同步发送失败策略:
普通消息,消息发送默认采用
round-robin
策略(轮转)来选择所发送到的队列。
- 如果发送失败,默认重试2次。
但在重试时是不会选择上次发送失败的
Broker
,而是选择其它Broker
。
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 设置同步发送失败时重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限为5s,默认10s
producer.setSendMsgTimeout(5000);
异步发送失败策略:
异步发送失败重试时,异步重试不会选择其他
Broker
,仅在当前Broker
上做重试。
- 所以该策略无法保证消息不丢失。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定异步发送失败后不进行重试发送
producer.setRetryTimesWhenSendAsyncFailed(0);
消息刷盘失败策略:
消息刷盘超时,默认是不会将消息尝试发送到其他
Broker
。对于重要消息可以通过在
Broker
的配置文件设置retryAnotherBrokerWhenNotStoreOK
属性为true
来开启。
几种情况
异步发送在发送过程中出现异常进行重试:
在解析请求结果时,发现响应状态码有其它异常(消息可能未正确被
Broker
处理)会继续进行重试。
- 重试依然选择当前
Broker
。但是如果响应结果不为空的话,即使处理响应时发生异常也不会进行重试。
同步发送时:
如果发送过程中没有异常,但是发送结果不OK,也会选择另一个
Broker
继续进行重试。
顺序消息发送失败不进行重试:
顺序消息:指的是同步+指定消息队列的方式发送。
Consumer端重试
消息正常的到了消费者,结果消费者发生异常,处理失败了。
例如反序列化失败,消息数据本身无法处理等。
顺序消息
顺序消息的消费重试
顺序消息,当
Consumer
消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。
- 消费重试默认间隔时间为
1000ms
。重试期间应用会出现消息消费被阻塞的情况。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 顺序消息消费失败的消费重试时间间隔,单位毫秒,默认为1000,其取值范围为[10, 30000]
consumer.setSuspendCurrentQueueTimeMillis(100);
由于对顺序消息的重试是无休止的,不间断的,直至消费成功。
- 所以,对于顺序消息的消费,务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。
注意: 顺序消息没有发送失败重试机制,但具有消费失败重试机制。
消费状态
顺序消费目前两个状态:
SUCCESS
和SUSPEND_CURRENT_QUEUE_A_MOMENT
。
SUSPEND_CURRENT_QUEUE_A_MOMENT
意思是先暂停消费一下:
- 过
SuspendCurrentQueueTimeMillis
时间间隔后再重试一下,而不是放到重试队列里。
public enum ConsumeOrderlyStatus {
SUCCESS,
@Deprecated
ROLLBACK,
@Deprecated
COMMIT,
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
并发消息
并发消息的消费重试
在并发消费中,可能会有多个线程同时消费一个队列的消息。
因此即使发送端通过发送顺序消息保证消息在同一个队列中按照
FIFO
的顺序,也无法保证消息实际被顺序消费。
- 所有并发消费也可以称之为无序消费。
对于无序消息(普通消息、延时消息、事务消息):
- 当
Consumer
消费消息失败时,可以通过设置返回状态达到消息重试的效果。
注意:
无序消息的重试只针对集群消费模式生效。
广播消费模式不提供失败重试特性:即消费失败后,失败消息不再重试,继续消费新的消息。
消费状态
Consumer
端消息消费有两种状态:
一个是成功(
CONSUME_SUCCESS
),一个是失败&稍后重试(RECONSUME_LATER
)。
Consumer
为了保证消息消费成功,只有使用方明确表示消费成功。
- 返回
CONSUME_SUCCESS
,RocketMQ
才会认为消息消费成功。若是消息消费失败,只要返回:
ConsumeConcurrentlyStatus.RECONSUME_LATER
。
RocketMQ
就会认为消息消费失败了,要重新投递。
public enum ConsumeConcurrentlyStatus {
CONSUME_SUCCESS,
RECONSUME_LATER;
}
重试机制
为了保证消息是确定被至少消费成功一次,
RocketMQ
会把这批消息重发回Broker
。
Topic
不是原Topic
而是一个RETRY Topic
。
在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递。
而若是一直这样重复消费都持续失败到必定次数(默认16次),就会投递到死信队列。
在启动Broker
的过程当中,能够观察到以下输出:
2024-09-19 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
RECONSUME_LATER策略:
若是消费失败,那么1S后再次消费,若是失败,那么5S后,再次消费,…… 直至2H后若是消费还失败。
- 那么该条消息就会终止发送给消费者了。
消息重试间隔时间如下:
重试次数 | 与上次重试的间隔时间 | 重试次数 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10秒 | 9 | 7分钟 |
2 | 30秒 | 10 | 8分钟 |
3 | 1分钟 | 11 | 9分钟 |
4 | 2分钟 | 12 | 10分钟 |
5 | 3分钟 | 13 | 20分钟 |
6 | 4分钟 | 14 | 30分钟 |
7 | 5分钟 | 15 | 1小时 |
8 | 6分钟 | 16 | 2小时 |
某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试。
- 超过这个时间范围消息将不再重试投递,而被投递至死信队列。
修改消费重试次数:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消费重试次数
consumer.setMaxReconsumeTimes(10);
基本原理
重试的
Message
,RocketMQ
的做法并不是将其投递回原Topic
,而是重试队列。每个
ConsumerGroup
都有自己的重试队列:
- 其名称是由特定的前缀拼接上
ConsumerGroup
所组成,默认%RETRY%+消费者组名称
。- 所以在
Consumer
启动时,就会同时消费其ConsumerGroup
对应的重试队列和普通队列。消费失败的
Message
,Consumer
会将其投回Broker
:
- 相当于这条
Message
已经被消费掉了,之后重试的只是内容相同、但实际不是同一条的Message
。- 然后会校验重试的次数,如果达到16次则会进入死信队列 **,**组成为
%DLQ%+消费者组名称
。- 未达到最大重试次数,则会根据重试间隔时间等级将其投递到延迟队列
SCHEDULE_TOPIC_XXXX
中。- 然后等到了延迟等级对应的时间之后,再投递到
ConsumerGroup
所对应的重试队列当中,供后续消费。
消息重复
如果消费端收到两条一样的消息,应该怎样处理?
《RocketMQ 原理简介》中讲到:
RocketMQ
无法避免消息重复。
所以如果业务对消费重复非常敏感,务必要在业务侧去重,有以下几种去重方式:
消费端处理消息的业务逻辑保持幂等性。
- 如何保证幂等性,可以看我之前的文章!
保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。
- 利用一张日志表来记录已经处理成功的消息的ID。
- 如果新到的消息ID已经在日志表中,那么就不再处理这条消息。