RocketMQ

月伴飞鱼 2024-11-07 11:56:10
消息队列
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!

官网:https://rocketmq.apache.org/

中文文档:https://github.com/apache/rocketmq/tree/master/docs/cn

最佳实践:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md

设计文档:https://github.com/apache/rocketmq/blob/master/docs/cn/design.md

基本概念

Topic:

一类消息的集合,消息发送者将一类消息发送到一个主题中,例如订单模块将订单发送到 order_topic 中,而用户登录时,将登录事件发送到 user_login_topic 中。

标签(Tag):

消息设置的标志,用于同一Topic下区分不同类型的消息,可以根据Topic+Tag实现消息的精细化生产和消费。

ConsumeGroup:

消息消费组,一个消费组拥有多个消费者,消费组首先在启动时需要订阅需要消费的Topic。

  • 一个Topic可以被多个消费组订阅,同样一个消费组也可以订阅多个主题。

队列(Queue)

一个Topic可以有很多队列,消息都存在Queue上,默认是一个Topic在同一个Broker组中是4个。

  • 如果一个Topic现在在2个Broker组中,那么就有可能有8个队列。

由于一个Topic可能会有很多的队列,消息发送到哪个队列上?

RocketMQ提供了两种消息队列的选择算法:

  • 轮询算法。
  • 最小投递延迟算法。

轮询算法:

  • 一个队列一个队列发送消息,这些就能保证消息能够均匀分布在不同的队列下,默认的队列选择算法。

最小投递延迟算法:

  • 每次消息投递的时候会统计投递的时间延迟,在选择队列的时候会优先选择投递延迟时间小的队列。
  • 这种算法可能会导致消息分布不均匀的问题。

消费模式

集群模式:

同一Topic下的一条消息只会被同一消费组中的一个消费者消费。

  • 消息被负载均衡到了同一个消费组的多个消费者实例上。

图片

广播消费:

每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

  • 通常用于刷新内存缓存。

图片

Confirm机制

消息生产者把消息发送给MQ,如果接收成功,MQ会返回一个ACK消息给生产者。

如果消息接收不成功,MQ会返回一个Nack消息给生产者。

img

重试机制

集群消费下,重试机制的本质是 RocketMQ 的延迟消息功能。

Broker 端会为每个 Topic 创建一个重试队列

  • 队列名称是:%RETRY% + 消费者组名

达到重试时间后将消息投递到重试队列中进行消费重试(消费者组会自动订阅重试 Topic)。

最多重试消费 16 次,重试的时间间隔逐渐变长。

  • 若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列

死信队列

死信队列用于处理无法被正常消费的消息。

  • 当一条消息初次消费失败,消息队列会自动进行消息重试。

  • 达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。

    • 此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息,将存储死信消息的特殊队列称为死信队列。

Queue分配算法

一个Topic中的Queue只能由Consumer Group中的一个Consumer进行消费。

  • 而一个Consumer可以同时消费多个Queue中的消息。

那么QueueConsumer间的配对关系是如何确定的,即Queue要分配给哪个Consumer进行消费?

常见的有四种策略,分别是:

  • 平均分配策略、环形平均策略、一致性Hash策略、同机房策略。

  • 这些策略是通过在创建Consumer时的构造器传进去的。

平均分配策略(默认):

该算法是根据:Avg = QueueCount / ConsumerCount的计算结果进行分配的。

如果能够整除,则按顺序将AvgQueue逐个分配,如果不能整除。

  • 则将多余出的Queue按照Consumer顺序逐个分配。

img

环形分配策略:

环形平均算法是指,根据消费者的顺序,依次由Queue队列组成的环形图逐个分配,该方法不需要提前计算。

img

一致性哈希分配策略:

该算法会将ConsumerHash值作为Node节点存放到Hash环上,然后将QueueHash值也放到Hash环上。

通过顺时针方向,距离Queue最近的那个Consumer就是该Queue要分配的Consumer

一致性哈希算法可以有效减少由于消费者组扩容或缩容所带来的大量的Rebalance

  • 所以它适合用在Consumer数量变化较频繁的场景。

但是一致性哈希算法也存在不足,就是分配效率较低,容易导致分配不均的情况。

即每个消费者消费的队列数,有可能相差很大,这样就会造成个别消费者压力过大。

  • 可以引入虚拟桶,让QueueHash环中尽可能分配均匀。

img

机房分配策略:

该算法会根据Queue的部署机房位置和Consumer的位置,过滤出当前Consumer相同机房的Queue

然后按照平均分配策略或环形平均策略对同机房Queue进行分配。

如果没有同机房Queue,则按照平均配策略或环形平均策略对所有Queue进行分配。

img

Rebalance机制

Rebalance即再均衡:

  • 指的是将⼀个Topic下的多个Queue在同⼀个Consumer Group中的多个Consumer间进行重新分配的过程。

它能够提升消息的并行消费能力。

哪些场景会触发Rebalance

消费者所订阅Topic的队列数量发生变化。

比如动态调整了Topic对应的队列数量,那么此时肯定是要重新分配一下,也就是触发Rebalance再均衡。

  • 例如⼀个Topic下5个队列,有2个消费者的情况下,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列。

  • 假设调整到Topic下有7个队列,还是2个消费者的情况下,那么就可以给其中⼀个消费者分配4个队列,给另⼀个分配3个队列。

  • 从而提升消息的并行消费能力。

Broker扩容或缩容、BrokerNameServer间发生网络异常、Queue扩容或缩容等场景。

  • 都可能导致消费者所订阅Topic的队列数量发生变化。

img

消费者组中消费者的数量发生变化。

  • 比如动态添加了Consumer进行消费,那么此时肯定是要重新分配一下,也就是触发Rebalance再均衡。

例如,⼀个Topic下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。

  • 如果此时增加⼀个消费者,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列,从而提升消息的并行消费能力。

Consumer Group扩容或缩容、ConsumerNameServer间发生网络异常、Consumer发生宕机等。

  • 都会导致消费者组中消费者的数量发生变化。

由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时。

  • 多余的消费者实例将分配不到任何队列,等于是多余的消费者什么都不做,白白浪费。

img

Rebalance的危害?

消费暂停:

  • 在只有一个Consumer时,其负责消费所有队列。
  • 在新增了一个Consumer后会触发 Rebalance 的发生。
  • 此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费。

消费重复:

  • Consumer在消费新分配给自己的队列时,必须接着之前Consumer提交的消费进度的offset继续消费。
  • 然而默认情况下,offset是异步提交的,这个异步性导致提交到BrokeroffsetConsumer实际消费的消息并不一致。
  • 这个不一致的差值就是可能会重复消费的消息。

消费突刺:

  • 由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance暂停时间过长从而导致积压了部分消息。
  • 那么有可能会导致在Rebalance结束之后瞬间需要消费很多消息。

消息过滤

RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。

消息过滤目前是在Broker端实现的:

  • 优点是减少了对于Consumer无用消息的网络传输。

  • 缺点是增加了Broker的负担、而且实现相对复杂。

RocketMQ支持两种方式的消息过滤:

  • 一种是Tag过滤,另外一种是SQL过滤。

基本组件

image

Nameserver

Nameserver集群,Topic的路由注册中心,为客户端根据Topic提供路由服务,从而引导客户端向Broker发送消息。

Nameserver之间的节点不通信,路由信息在Nameserver集群中采取的最终一致性。

Broker

消息存储服务器,分为两种角色:MasterSlave

RocketMQ中,主服务承担读写操作,从服务器作为一个备份,当主服务器存在压力时,从服务器可以承担读服务(消息消费)。

所有Broker,包含Slave服务器每隔30s会向Nameserver发送心跳包,心跳包中会包含存在在Broker上所有的Topic的路由信息。

Client

消息客户端,包括Producer(消息发送者)和Consumer(消费消费者),客户端在同一时间只会连接一台NameServer

只有在连接出现异常时才会尝试连接另外一台,客户端每隔30s向NameServer发起Topic的路由信息查询。

集群模式

RocketMQ5.0以前,有两种集群部署模式,分别为主从模式(Master-Slave模式)和Dledger模式。

主从模式

主从模式中,Broker 分为 MasterSlave,一个 Master 可对应多个 Slave,一个 Slave 只能对应一个 Master

每个 BrokerNameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer

Master 节点负责接收客户端的写入请求,并将消息持久化到磁盘上。

Slave 节点则负责从 Master 节点复制消息数据,并保持与 Master 节点的同步。

消费者可以从Master节点拉取消息,也可以从Slave节点拉取消息。

RocketMQ4.5版本之前,如果Master宕机,不支持自动将Slave切换为Master,需要人工介入。

img

Dledger模式

为了解决主从架构下Slave不能自动切换为Master的问题,4.5版本之后提供了DLedger模式。

使用Raft算法,如果Master节点出现故障,可以自动从Slave节点中选举出新的Master进行切换。

存在问题

根据Raft算法的多数原则,集群至少有三个节点以上,在消息写入时。

  • 需要大多数的Follower节点响应成功才能认为消息写入成功。

Dledger模式下,进行消息写入的时候,使用的是OpenMessaging包中提供的接口。

  • 无法利用RocketMQ原生的存储和复制能力。

存在两套日志复制流程(主从模式下一套、Dledger模式下一套),不统一。

Controller模式

为了解决如上问题,RocketMQ5.0以后推出了Controller模式,它的特点如下:

在主从部署模式下就具有自动切换Master的能力,5.0之前需要使用DLedger才可以。

可以利用RocketMQ原生存储复制能力,并统一RocketMQ的存储和复制能力。

RocketMQ5.0Broker选主相关的功能进行了抽离,放在Controller中。

实现了在主从部署模式下就可以自动切换Master,Controller可以独立部署也可以嵌入在NameServer中部署。

独立部署下的Controller

img

嵌入NameServer中的部署图如下:

img

Controller

一般集群中部署多个Controller,使用Raft算法选举出一个Active DLedger Controller作为主控制器。

它主要用来管理一个SyncStateSet集合。

  • 这个集合中存储的是一组跟上Master进度的Broker节点集合。

如果Controller发现某个Master Broker下线时,会从集合中选出新的Master Broker并切换。

  • Controller可以单独部署可以嵌在NameServer中部署。

由于Controller控制每个节点的角色,所以每个Broker也会定时向Controller发送请求获取主备信息。

  • 以便在角色发生变化的时候可以及时更新。

img

同步复制

生产者发送消息后,Master 接收到存储消息请求,将消息数据同步给 Slave 后,才将存储结果返回给生产者。

同步复制模式下,发送消息会有一定延迟,系统吞吐量也会降低。

异步复制

生产者发送消息后,Master 接收到存储消息请求,将消息存储后,直接将存储结果返回给生产者。

MasterSlave 再通过异步的方式同步数据,这种复制模式具有较小的延迟,可以实现比较高的吞吐量。

Master 出现故障,有些数据可能未写入 Slave,未同步的数据可能丢失。

Proxy代理层

RocketMQ5.0以前使用自定义的Remoting协议底层基于Netty进行网络通信,计算存储是一体的,都在Broker中。

生产者和消费者从NameServer中拉取到路由信息,之后直接与Broker交互进行消息的生产与消费。

img

5.0以后引入了弹性无状态的代理模式,对Broker的职责进行了拆分。

将客户端协议适配、权限管理、消费管理等计算逻辑进行了抽离,放入Proxy层,Broker专注数据的存储。

  • 以便更好的适应云原生环境,实现资源弹性调度。

并且5.0以后增加了GRPC协议的支持,它是Google开源的高性能RPC框架,基于Protobuf序列化。

img

从架构上来看,增加Proxy代理层后,生产者和消费者不再直接与Broker通信,而是与Proxy层通信。

  • Proxy层再与NameServerBroker交互进行消息的发送和消费。

如果需要提高计算层的能力,只需要增加Proxy层,如果需要提高存储层的能力,增加Broker的部署即可。

消费模式

RocketMQ5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。

Pull模式

消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的。

  • 所以Pull模式下消费需要不断主动从Broker拉取消息。

Push模式

需要注册消息监听器,当有消息到达时会通过回调函数进行消息消费。

  • 从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式

底层依旧是消费者从Broker拉取数据然后触发回调函数进行消息消费,只不过不需要像Pull模式一样不断判断是否有消息到来。

RocketMQ5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级。

并且5.0之后支持消息粒度的负载均衡。

消息粒度负载均衡

消息粒度负载均衡策略中,同一消费组内的多个消费者将按照消息粒度平均分摊主题中的所有消息。

  • 即同一个队列中的消息,可被平均分配给组内多个消费者共同消费。

Pop消息消费

首先客户端(消费者)向服务端(Broker)发送Pop请求,Broker端收到请求后以Pop模式获取消息,之后返回给客户端。

  • 客户端消费消息成功之后,Broker发送ACK请求确认消息消费成功。

img

POP的消费位点由Broker保存和控制,并且POP模式可以使多个消费者端消费同一个消息队列中的消息。

消费者端不再需要在本地做负载均衡分配消息队列,只需要调用服务端提供的POP接口获取消息进行消费即可。

即便某个消费者Hang住,其他消费者依旧可以继续消费队列中的数据,不会造成消息堆积。

img

消息类型

延迟消息

RocketMQ支持18个等级,每个等级代表一个延迟时间。

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

原理解析:

Brocker启动时,根据不同等级开启定时器,即分别对每个延迟等级都开启一个定时器。

  • RocketMQ是基于java.util.Timer的定时器。
image-20231125220819038

事务消息

支持在分布式场景下保障消息生产和本地事务的最终一致性。

image-20231007114659517

1、生产者将消息发送至 Broker

2、Broker 将消息持久化成功之后,向生产者返回 ACK 确认消息已经发送成功,此时消息被标记为 暂不能投递。

  • 这种状态下的消息即为半事务消息。

3、生产者开始执行本地事务逻辑。

4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),Broker 收到确认结果后处理逻辑如下:

  • 二次确认结果为 CommitBroker 将半事务消息标记为可投递,并投递给消费者。

  • 二次确认结果为 RollbackBroker 将回滚事务,不会将半事务消息投递给消费者。

5、在断网或者是生产者应用重启的特殊情况下:

Broker 未收到发送者提交的二次确认结果,或 Broker 收到的二次确认结果为 Unknown 未知状态。

经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查

  • 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  • 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

为什么要先发送Half Message(半消息):

可以先确认Broker服务器是否正常,如果半消息都发送失败了,那说明Broker挂了。

什么情况会回查:

执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(Commit或者Rollback)导致最终返回UNKNOW,就会回查。

本地事务执行成功后,返回Commit进行消息二次确认的时候的服务挂了。

  • 这个时候在Broker端,它还是个Half Message(半消息),这也会回查。

基本原理

总体架构图

img

零拷贝

零拷贝技术是一个思想,指的是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。

  • RocketMQ内部使用基于mmap实现的零拷贝,用来读写文件。

mmap():

mmap(memory map)是一种内存映射文件的方法。

  • 即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。

简单地说就是内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次CPU拷贝。

消息存储

CommitLog:

CommitLog,消息存储文件,所有主题的消息都存储在 CommitLog 文件中。

业务系统向 RocketMQ 发送一条消息,最终这条消息会被持久化到CommitLog文件。

一台Broker服务器只有一个CommitLog文件(组),RocketMQ会将所有主题的消息存储在同一个文件中,这个文件中就存储着一条条Message,每条Message都会按照顺序写入。

image-20240122170451767

ConsumeQueue:

它是为了高效检索主题消息的。

通过ConsumerQueue可以知道消息的长度和偏移量,那么查找消息就比较容易了。

消息偏移量的差值等于 = 消息长度 * 队列长度

Index:

除了通过消息偏移量来查找消息的方式,还提供了其他几种方式可以查询消息:

  • 通过Message Key查询
  • 通过Unique Key查询
  • 通过Message Id查询

Message Key和Unique Key都是在消息发送之前,由客户端生成的。

可以自己设置,也可以由客户端自动生成,Message Id是在Broker端存储消息的时候生成。

消费方式

RocketMQ消费方式有PUSH与PULL两种,但实现机制实为 PULL 模式

PUSH 模式是一种伪推送,是对 PULL 模式的封装,每拉去一批消息后,提交到消费端的线程池(异步),然后马上向 Broker 拉取消息,即实现类似 的效果

img

拉取式消费(Pull Consumer):

Pull指:由消费者客户端主动向消息中间件拉取消息

推动式消费(Push Consumer):

Push指:由消息中间件主动地将消息推送给消费者

  • 采用Push方式,可以尽可能实时地将消息发送给消费者进行消费

长轮询:

在PULL模式下为了保证消费的实时性,采起了长轮询消息服务器拉取消息的方式,每隔一定时间客户端向服务端发起一次请求

  • 长轮询还是由Consumer发起的,因此就算Broker端有大量数据也不会主动推送给Consumer

应用场景

消息重试(消费者)

Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。

Consumer消费消息失败有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理。

  • 由于依赖的下游应用服务不可用,例如DB连接不可用,外系统网络不可达等。

RocketMQ会为每个消费组都设置一个Topic名称为%RETRY%+consumerGroup的重试队列:

  • 这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的。
  • 用于暂时保存因为各种异常而导致Consumer端无法消费的消息。

考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别:

  • 每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。

RocketMQ对于重试消息的处理:

  • 先保存至Topic名称为SCHEDULE_TOPIC_XXXX的延迟队列中。
  • 后台定时任务按照对应的时间进行Delay后重新保存至%RETRY%+consumerGroup的重试队列中。

消息重投(生产者)

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。

  • 消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。

消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。

  • 生产者主动重发Consumer负载变化也会导致重复消息。

如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:

    • 同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。
    • 不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。
    • 超过重投次数,抛出异常,由客户端保证消息不丢。
    • 当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
  • retryTimesWhenSendAsyncFailed:

    • 异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:

    • 消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。

顺序消费

顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。

分区顺序消息

对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。

同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

适用场景:

  • 适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。

示例:

  • 电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

全局顺序消息

对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。

适用场景:

  • 适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。

示例:

  • 在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!