官网:https://rocketmq.apache.org/
中文文档:https://github.com/apache/rocketmq/tree/master/docs/cn
最佳实践:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
设计文档:https://github.com/apache/rocketmq/blob/master/docs/cn/design.md
基本概念
Topic:
一类消息的集合,消息发送者将一类消息发送到一个主题中,例如订单模块将订单发送到
order_topic
中,而用户登录时,将登录事件发送到user_login_topic
中。
标签(Tag):
消息设置的标志,用于同一Topic下区分不同类型的消息,可以根据Topic+Tag实现消息的精细化生产和消费。
ConsumeGroup:
消息消费组,一个消费组拥有多个消费者,消费组首先在启动时需要订阅需要消费的Topic。
- 一个Topic可以被多个消费组订阅,同样一个消费组也可以订阅多个主题。
队列(Queue):
一个Topic可以有很多队列,消息都存在Queue上,默认是一个Topic在同一个Broker组中是4个。
- 如果一个Topic现在在2个Broker组中,那么就有可能有8个队列。
由于一个Topic可能会有很多的队列,消息发送到哪个队列上?
RocketMQ提供了两种消息队列的选择算法:
- 轮询算法。
- 最小投递延迟算法。
轮询算法:
- 一个队列一个队列发送消息,这些就能保证消息能够均匀分布在不同的队列下,默认的队列选择算法。
最小投递延迟算法:
- 每次消息投递的时候会统计投递的时间延迟,在选择队列的时候会优先选择投递延迟时间小的队列。
- 这种算法可能会导致消息分布不均匀的问题。
消费模式
集群模式:
同一
Topic
下的一条消息只会被同一消费组中的一个消费者消费。
- 消息被负载均衡到了同一个消费组的多个消费者实例上。
广播消费:
每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。
- 通常用于刷新内存缓存。
Confirm机制
消息生产者把消息发送给
MQ
,如果接收成功,MQ
会返回一个ACK
消息给生产者。如果消息接收不成功,
MQ
会返回一个Nack
消息给生产者。
重试机制
集群消费下,重试机制的本质是
RocketMQ
的延迟消息功能。
Broker
端会为每个Topic
创建一个重试队列 :
- 队列名称是:
%RETRY% + 消费者组名
。达到重试时间后将消息投递到重试队列中进行消费重试(消费者组会自动订阅重试
Topic
)。最多重试消费 16 次,重试的时间间隔逐渐变长。
- 若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。
死信队列
死信队列用于处理无法被正常消费的消息。
当一条消息初次消费失败,消息队列会自动进行消息重试。
达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。
- 此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ
将这种正常情况下无法被消费的消息称为死信消息,将存储死信消息的特殊队列称为死信队列。
Queue分配算法
一个
Topic
中的Queue
只能由Consumer Group
中的一个Consumer
进行消费。
- 而一个
Consumer
可以同时消费多个Queue
中的消息。那么
Queue
与Consumer
间的配对关系是如何确定的,即Queue
要分配给哪个Consumer
进行消费?常见的有四种策略,分别是:
平均分配策略、环形平均策略、一致性
Hash
策略、同机房策略。这些策略是通过在创建
Consumer
时的构造器传进去的。
平均分配策略(默认):
该算法是根据:
Avg = QueueCount / ConsumerCount
的计算结果进行分配的。如果能够整除,则按顺序将
Avg
个Queue
逐个分配,如果不能整除。
- 则将多余出的
Queue
按照Consumer
顺序逐个分配。
环形分配策略:
环形平均算法是指,根据消费者的顺序,依次由
Queue
队列组成的环形图逐个分配,该方法不需要提前计算。
一致性哈希分配策略:
该算法会将
Consumer
的Hash
值作为Node
节点存放到Hash
环上,然后将Queue
的Hash
值也放到Hash
环上。通过顺时针方向,距离
Queue
最近的那个Consumer
就是该Queue
要分配的Consumer
。一致性哈希算法可以有效减少由于消费者组扩容或缩容所带来的大量的
Rebalance
。
- 所以它适合用在
Consumer
数量变化较频繁的场景。但是一致性哈希算法也存在不足,就是分配效率较低,容易导致分配不均的情况。
即每个消费者消费的队列数,有可能相差很大,这样就会造成个别消费者压力过大。
- 可以引入虚拟桶,让
Queue
在Hash
环中尽可能分配均匀。
机房分配策略:
该算法会根据
Queue
的部署机房位置和Consumer
的位置,过滤出当前Consumer
相同机房的Queue
。然后按照平均分配策略或环形平均策略对同机房
Queue
进行分配。如果没有同机房
Queue
,则按照平均配策略或环形平均策略对所有Queue
进行分配。
Rebalance机制
Rebalance
即再均衡:
- 指的是将⼀个
Topic
下的多个Queue
在同⼀个Consumer Group
中的多个Consumer
间进行重新分配的过程。它能够提升消息的并行消费能力。
哪些场景会触发Rebalance
?
消费者所订阅
Topic
的队列数量发生变化。比如动态调整了
Topic
对应的队列数量,那么此时肯定是要重新分配一下,也就是触发Rebalance
再均衡。
例如⼀个
Topic
下5个队列,有2个消费者的情况下,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列。假设调整到
Topic
下有7个队列,还是2个消费者的情况下,那么就可以给其中⼀个消费者分配4个队列,给另⼀个分配3个队列。从而提升消息的并行消费能力。
像
Broker
扩容或缩容、Broker
与NameServer
间发生网络异常、Queue
扩容或缩容等场景。
- 都可能导致消费者所订阅
Topic
的队列数量发生变化。
消费者组中消费者的数量发生变化。
- 比如动态添加了
Consumer
进行消费,那么此时肯定是要重新分配一下,也就是触发Rebalance
再均衡。例如,⼀个
Topic
下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。
- 如果此时增加⼀个消费者,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列,从而提升消息的并行消费能力。
像
Consumer Group
扩容或缩容、Consumer
与NameServer
间发生网络异常、Consumer
发生宕机等。
- 都会导致消费者组中消费者的数量发生变化。
由于⼀个队列最多分配给⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时。
- 多余的消费者实例将分配不到任何队列,等于是多余的消费者什么都不做,白白浪费。
Rebalance
的危害?
消费暂停:
- 在只有一个
Consumer
时,其负责消费所有队列。- 在新增了一个
Consumer
后会触发Rebalance
的发生。- 此时原
Consumer
就需要暂停部分队列的消费,等到这些队列分配给新的Consumer
后,这些暂停消费的队列才能继续被消费。消费重复:
Consumer
在消费新分配给自己的队列时,必须接着之前Consumer
提交的消费进度的offset
继续消费。- 然而默认情况下,
offset
是异步提交的,这个异步性导致提交到Broker
的offset
与Consumer
实际消费的消息并不一致。- 这个不一致的差值就是可能会重复消费的消息。
消费突刺:
- 由于
Rebalance
可能导致重复消费,如果需要重复消费的消息过多,或者因为Rebalance
暂停时间过长从而导致积压了部分消息。- 那么有可能会导致在
Rebalance
结束之后瞬间需要消费很多消息。
消息过滤
RocketMQ
的消费者可以根据Tag
进行消息过滤,也支持自定义属性过滤。消息过滤目前是在
Broker
端实现的:
优点是减少了对于
Consumer
无用消息的网络传输。缺点是增加了
Broker
的负担、而且实现相对复杂。
RocketMQ
支持两种方式的消息过滤:
- 一种是
Tag
过滤,另外一种是SQL
过滤。
基本组件
Nameserver
:
Nameserver
集群,Topic
的路由注册中心,为客户端根据Topic
提供路由服务,从而引导客户端向Broker
发送消息。
Nameserver
之间的节点不通信,路由信息在Nameserver
集群中采取的最终一致性。
Broker
:
消息存储服务器,分为两种角色:
Master
与Slave
。在
RocketMQ
中,主服务承担读写操作,从服务器作为一个备份,当主服务器存在压力时,从服务器可以承担读服务(消息消费)。所有
Broker
,包含Slave
服务器每隔30s会向Nameserver
发送心跳包,心跳包中会包含存在在Broker
上所有的Topic
的路由信息。
Client
:
消息客户端,包括
Producer
(消息发送者)和Consumer
(消费消费者),客户端在同一时间只会连接一台NameServer
。只有在连接出现异常时才会尝试连接另外一台,客户端每隔30s向
NameServer
发起Topic
的路由信息查询。
集群模式
在RocketMQ5.0
以前,有两种集群部署模式,分别为主从模式(Master-Slave
模式)和Dledger
模式。
主从模式
主从模式中,
Broker
分为Master
与Slave
,一个Master
可对应多个Slave
,一个Slave
只能对应一个Master
。每个
Broker
与NameServer
集群中的所有节点建立长连接,定时注册Topic
信息到所有NameServer
。
Master
节点负责接收客户端的写入请求,并将消息持久化到磁盘上。
Slave
节点则负责从Master
节点复制消息数据,并保持与Master
节点的同步。消费者可以从
Master
节点拉取消息,也可以从Slave
节点拉取消息。在
RocketMQ4.5
版本之前,如果Master
宕机,不支持自动将Slave
切换为Master
,需要人工介入。
Dledger模式
为了解决主从架构下
Slave
不能自动切换为Master
的问题,4.5版本之后提供了DLedger
模式。使用
Raft
算法,如果Master
节点出现故障,可以自动从Slave
节点中选举出新的Master
进行切换。存在问题
根据
Raft
算法的多数原则,集群至少有三个节点以上,在消息写入时。
- 需要大多数的
Follower
节点响应成功才能认为消息写入成功。
Dledger
模式下,进行消息写入的时候,使用的是OpenMessaging
包中提供的接口。
- 无法利用
RocketMQ
原生的存储和复制能力。存在两套日志复制流程(主从模式下一套、
Dledger
模式下一套),不统一。
Controller模式
为了解决如上问题,RocketMQ5.0
以后推出了Controller
模式,它的特点如下:
在主从部署模式下就具有自动切换
Master
的能力,5.0
之前需要使用DLedger
才可以。可以利用
RocketMQ
原生存储复制能力,并统一RocketMQ
的存储和复制能力。
RocketMQ5.0
对Broker
选主相关的功能进行了抽离,放在Controller
中。
实现了在主从部署模式下就可以自动切换Master,Controller
可以独立部署也可以嵌入在NameServer
中部署。
独立部署下的Controller
:
嵌入NameServer
中的部署图如下:
Controller
一般集群中部署多个
Controller
,使用Raft
算法选举出一个Active DLedger Controller
作为主控制器。它主要用来管理一个
SyncStateSet
集合。
- 这个集合中存储的是一组跟上
Master
进度的Broker
节点集合。如果
Controller
发现某个Master Broker
下线时,会从集合中选出新的Master Broker
并切换。
Controller
可以单独部署可以嵌在NameServer
中部署。
由于Controller
控制每个节点的角色,所以每个Broker
也会定时向Controller
发送请求获取主备信息。
- 以便在角色发生变化的时候可以及时更新。
同步复制
生产者发送消息后,
Master
接收到存储消息请求,将消息数据同步给Slave
后,才将存储结果返回给生产者。同步复制模式下,发送消息会有一定延迟,系统吞吐量也会降低。
异步复制
生产者发送消息后,
Master
接收到存储消息请求,将消息存储后,直接将存储结果返回给生产者。
Master
和Slave
再通过异步的方式同步数据,这种复制模式具有较小的延迟,可以实现比较高的吞吐量。若
Master
出现故障,有些数据可能未写入Slave
,未同步的数据可能丢失。
Proxy代理层
RocketMQ5.0
以前使用自定义的Remoting
协议底层基于Netty
进行网络通信,计算存储是一体的,都在Broker
中。
生产者和消费者从NameServer
中拉取到路由信息,之后直接与Broker
交互进行消息的生产与消费。
5.0以后引入了弹性无状态的代理模式,对
Broker
的职责进行了拆分。将客户端协议适配、权限管理、消费管理等计算逻辑进行了抽离,放入
Proxy
层,Broker
专注数据的存储。
- 以便更好的适应云原生环境,实现资源弹性调度。
并且5.0以后增加了
GRPC
协议的支持,它是Protobuf
序列化。
从架构上来看,增加
Proxy
代理层后,生产者和消费者不再直接与Broker
通信,而是与Proxy
层通信。
Proxy
层再与NameServer
和Broker
交互进行消息的发送和消费。如果需要提高计算层的能力,只需要增加
Proxy
层,如果需要提高存储层的能力,增加Broker
的部署即可。
消费模式
在RocketMQ5.0
之前,消费有两种方式可以从Broker
获取消息,分别为Pull
模式和Push
模式。
Pull模式:
消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从
Broker
拉取消息之后加入的。
- 所以
Pull
模式下消费需要不断主动从Broker
拉取消息。Push模式:
需要注册消息监听器,当有消息到达时会通过回调函数进行消息消费。
- 从表面上看就像是
Broker
主动推送给消费者一样,所以叫做推模式。底层依旧是消费者从
Broker
拉取数据然后触发回调函数进行消息消费,只不过不需要像Pull
模式一样不断判断是否有消息到来。
在RocketMQ5.0
增加了Pop
模式消费,将负载均衡、消费位点管理等功能放到了Broker
端,减少客户端的负担,使其变得轻量级。
并且5.0之后支持消息粒度的负载均衡。
消息粒度负载均衡
消息粒度负载均衡策略中,同一消费组内的多个消费者将按照消息粒度平均分摊主题中的所有消息。
- 即同一个队列中的消息,可被平均分配给组内多个消费者共同消费。
Pop消息消费
首先客户端(消费者)向服务端(
Broker
)发送Pop
请求,Broker
端收到请求后以Pop
模式获取消息,之后返回给客户端。
- 客户端消费消息成功之后,
Broker
发送ACK
请求确认消息消费成功。
POP
的消费位点由Broker
保存和控制,并且POP
模式可以使多个消费者端消费同一个消息队列中的消息。消费者端不再需要在本地做负载均衡分配消息队列,只需要调用服务端提供的
POP
接口获取消息进行消费即可。即便某个消费者
Hang
住,其他消费者依旧可以继续消费队列中的数据,不会造成消息堆积。
消息类型
延迟消息
RocketMQ支持18个等级,每个等级代表一个延迟时间。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
原理解析:
Brocker启动时,根据不同等级开启定时器,即分别对每个延迟等级都开启一个定时器。
- RocketMQ是基于
java.util.Timer
的定时器。
事务消息
支持在分布式场景下保障消息生产和本地事务的最终一致性。
1、生产者将消息发送至
Broker
。2、
Broker
将消息持久化成功之后,向生产者返回ACK
确认消息已经发送成功,此时消息被标记为 暂不能投递。
- 这种状态下的消息即为半事务消息。
3、生产者开始执行本地事务逻辑。
4、生产者根据本地事务执行结果向服务端提交二次确认结果(
Commit
或是Rollback
),Broker
收到确认结果后处理逻辑如下:
二次确认结果为
Commit
:Broker
将半事务消息标记为可投递,并投递给消费者。二次确认结果为
Rollback
:Broker
将回滚事务,不会将半事务消息投递给消费者。5、在断网或者是生产者应用重启的特殊情况下:
若
Broker
未收到发送者提交的二次确认结果,或Broker
收到的二次确认结果为Unknown
未知状态。经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查:
生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
为什么要先发送Half Message
(半消息):
可以先确认
Broker
服务器是否正常,如果半消息都发送失败了,那说明Broker
挂了。
什么情况会回查:
执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(
Commit
或者Rollback
)导致最终返回UNKNOW
,就会回查。本地事务执行成功后,返回
Commit
进行消息二次确认的时候的服务挂了。
- 这个时候在
Broker
端,它还是个Half Message
(半消息),这也会回查。
基本原理
总体架构图
零拷贝
零拷贝技术是一个思想,指的是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。
- RocketMQ内部使用基于mmap实现的零拷贝,用来读写文件。
mmap():
mmap(memory map)是一种内存映射文件的方法。
- 即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。
简单地说就是内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次CPU拷贝。
消息存储
CommitLog:
CommitLog
,消息存储文件,所有主题的消息都存储在CommitLog
文件中。业务系统向
RocketMQ
发送一条消息,最终这条消息会被持久化到CommitLog
文件。一台
Broker服务器
只有一个CommitLog
文件(组),RocketMQ
会将所有主题的消息存储在同一个文件中,这个文件中就存储着一条条Message,每条Message都会按照顺序写入。
ConsumeQueue:
它是为了高效检索主题消息的。
通过
ConsumerQueue
可以知道消息的长度和偏移量,那么查找消息就比较容易了。消息偏移量的差值等于 =
消息长度 * 队列长度
。
Index:
除了通过消息偏移量来查找消息的方式,还提供了其他几种方式可以查询消息:
- 通过Message Key查询
- 通过Unique Key查询
- 通过Message Id查询
Message Key和Unique Key
都是在消息发送之前,由客户端生成的。可以自己设置,也可以由客户端自动生成,
Message Id
是在Broker
端存储消息的时候生成。
消费方式
RocketMQ消费方式有PUSH与PULL两种,但实现机制实为
PULL
模式PUSH 模式是一种伪推送,是对 PULL 模式的封装,每拉去一批消息后,提交到消费端的线程池(异步),然后马上向
Broker
拉取消息,即实现类似 推 的效果
拉取式消费(Pull Consumer):
Pull指:由消费者客户端主动向消息中间件拉取消息
推动式消费(Push Consumer):
Push指:由消息中间件主动地将消息推送给消费者
- 采用Push方式,可以尽可能实时地将消息发送给消费者进行消费
长轮询:
在PULL模式下为了保证消费的实时性,采起了长轮询消息服务器拉取消息的方式,每隔一定时间客户端向服务端发起一次请求
- 长轮询还是由
Consumer
发起的,因此就算Broker
端有大量数据也不会主动推送给Consumer
应用场景
消息重试(消费者)
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。
Consumer消费消息失败有以下几种情况:
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理。
由于依赖的下游应用服务不可用,例如DB连接不可用,外系统网络不可达等。
RocketMQ会为每个消费组都设置一个Topic名称为
%RETRY%+consumerGroup
的重试队列:
- 这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的。
- 用于暂时保存因为各种异常而导致Consumer端无法消费的消息。
考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别:
- 每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。
RocketMQ对于重试消息的处理:
- 先保存至Topic名称为
SCHEDULE_TOPIC_XXXX
的延迟队列中。- 后台定时任务按照对应的时间进行Delay后重新保存至
%RETRY%+consumerGroup
的重试队列中。
消息重投(生产者)
生产者在发送消息时,同步消息失败会重投,异步消息有重试,
oneway
没有任何保证。
- 消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。
消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。
- 生产者主动重发Consumer负载变化也会导致重复消息。
如下方法可以设置消息重试策略:
retryTimesWhenSendFailed:
- 同步发送失败重投次数,默认为2,因此生产者会最多尝试发送
retryTimesWhenSendFailed + 1
次。- 不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。
- 超过重投次数,抛出异常,由客户端保证消息不丢。
- 当出现
RemotingException、MQClientException
和部分MQBrokerException
时会重投。retryTimesWhenSendAsyncFailed:
- 异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
retryAnotherBrokerWhenNotStoreOK:
- 消息刷盘(主或备)超时或slave不可用(返回状态非
SEND_OK
),是否尝试发送到其他broker,默认false。
顺序消费
顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
分区顺序消息
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。
同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用场景:
- 适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
示例:
- 电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
全局顺序消息
对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
适用场景:
- 适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
示例:
- 在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。