RocketMQ消息机制!
RocketMQ消息机制!
月伴飞鱼消息重试(消费者)
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。
Consumer消费消息失败有以下几种情况:
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理。
由于依赖的下游应用服务不可用,例如DB连接不可用,外系统网络不可达等。
RocketMQ会为每个消费组都设置一个Topic名称为
%RETRY%+consumerGroup
的重试队列:
- 这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的。
- 用于暂时保存因为各种异常而导致Consumer端无法消费的消息。
考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别:
- 每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。
RocketMQ对于重试消息的处理:
- 先保存至Topic名称为
SCHEDULE_TOPIC_XXXX
的延迟队列中。- 后台定时任务按照对应的时间进行Delay后重新保存至
%RETRY%+consumerGroup
的重试队列中。
消息重投(生产者)
生产者在发送消息时,同步消息失败会重投,异步消息有重试,
oneway
没有任何保证。
- 消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。
消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。
- 生产者主动重发Consumer负载变化也会导致重复消息。
如下方法可以设置消息重试策略:
retryTimesWhenSendFailed:
- 同步发送失败重投次数,默认为2,因此生产者会最多尝试发送
retryTimesWhenSendFailed + 1
次。- 不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。
- 超过重投次数,抛出异常,由客户端保证消息不丢。
- 当出现
RemotingException、MQClientException
和部分MQBrokerException
时会重投。retryTimesWhenSendAsyncFailed:
- 异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
retryAnotherBrokerWhenNotStoreOK:
- 消息刷盘(主或备)超时或slave不可用(返回状态非
SEND_OK
),是否尝试发送到其他broker,默认false。
顺序消费
顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费。
即先发布的消息先消费,后发布的消息后消费。
分区顺序消息
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。
同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用场景:
- 适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
示例:
- 电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
全局顺序消息
对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
适用场景:
- 适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
示例:
- 在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。