相关文章:
- RocketMQ延时消息解析:https://mp.weixin.qq.com/s/ta2f5-T63so3AJvuOW-BlQ
- RocketMQ消息重试机制解析:https://mp.weixin.qq.com/s/Ei-eYFfE0devt_vq0ZRfuw
单体架构下的事务
在单体系统的开发过程中,假如某个场景下需要对数据库的多张表进行操作,为了保证数据的一致性,一般会使用事务,将所有的操作全部提交或者在出错的时候全部回滚。
以创建订单为例,假设下单后需要做两个操作:
在订单表生成订单。
在积分表增加本次订单增加的积分记录。
在单体架构下只需使用@Transactional
开启事务,就可以保证数据的一致性。
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 生成订单
orderService.createOrder(orderId);
// 增加积分
creditService.addCredits(orderId);
}
但在分布式架构下,订单系统和积分系统可能是两个独立的服务,此时就不能使用上述的方法开启事务了,因为它们不处于同一个事务中。
- 在出错的情况下,无法进行全部回滚,只能对当前服务的事务进行回滚。
所以就有可能出现订单生成成功但是积分服务增加积分失败的情况(也可能相反),此时数据处于不一致的状态。
分布式架构下的事务
以下单流程为例,在分布式架构下的处理流程如下:
订单服务生成订单。
发送订单生成的
MQ
消息,积分服务订阅消息,有新的订单生成之后消费消息,增加对应的积分记录。
普通MQ消息存在的问题
假如订单创建成功,
MQ
消息发送成功,但是order
方法在返回的前一刻,服务突然宕机。由于开启了事务,事务还未提交(方法结束后才会正常提交)。
所以订单表并未生成记录,但是
MQ
却已经发送成功并且被积分服务消费,此时就会存在订单未创建但是积分记录增加的情况。假如先发送
MQ
消息再创建订单,如果MQ消息发送成功,创建订单失败,那么同样处于不一致的状态。
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 创建订单
Order order = orderService.createOrder(orderDTO.getOrderId());
// 发送订单创建的MQ消息
sendOrderMessge(order);
return;
}
可以使用RocketMQ
事务消息解决上述问题。
RocketMQ事务消息基础流程
Apache RocketMQ
在4.3.0
版中已经支持分布式事务消息。事务消息是
RocketMQ
提供的一种消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
RocketMQ
采用了2PC
的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
基本流程
第一阶段:
- 发送 Message,
Half Message
,即半事务消息。- 此类型的
Message
是不会被Consumer
消费。第二阶段:如果半事务消息投递成功,则会开始执行本地事务。
分为如下三种
Case
:
本地事务执行成功:
- 会向
Broker
发送commit
消息,被commit
过后的Message
才能被Consumer
消费到。本地事务执行失败:
- 会向
Broker
发送rollback
消息,Broker
则会将刚刚投递的半事务消息删除,从而保证上下游数据的一致性。如果
Producer
实例或者网络出现了问题,Producer
没能及时地将本地事务执行的结果通知Broker
。
Broker
会通过扫描发现某条Message
长时间处于半事务消息状态。
Broker
会主动地向Producer
询问此Message
对应的事务状态。
值得注意的是:
RocketMQ
并不会无休止的的信息事务状态回查,默认回查 15 次。如果 15 次回查还是无法得知事务状态,
RocketMQ
默认回滚该消息。
RocketMQ事务消息使用限制
事务消息不支持延时消息和批量消息。
事务性消息可能不止一次被检查或消费,所以消费者端需要做好消费幂等。
事务消息的生产者
ID
不能与其他类型消息的生产者ID
共享。
- 与其他类型的消息不同,事务消息允许反向查询、
MQ
服务器能通过它们的生产者ID
查询到消费者。
RocketMQ事务消息基本原理
采用2PC
两阶段设计。
将
Message
原本真实的Topic
和MessageQueue
进行备份。
- 放入到
PROPERTY_REAL_TOPIC
、PROPERTY_REAL_QUEUE_ID
中保存。将消息投递到一个内部
Topic
中RMQ_SYS_TRANS_HALF_TOPIC
,该队列专门存储事务消息。所有的
Half Message
全部都写入到queueId
为 0 的MessageQueue
。因为一个
Topic
下只有 1 个MessageQueue
:
- 这个
Topic
下的所有Message
就是全局有序的,它们会按照先来后到的顺序被消费。如果本地事务执行成功进行
Commit
,则将RMQ_SYS_TRANS_HALF_TOPIC
队列中的消息投递到真实的Topic
中,供后续流程执行。
- 并删除这条
Half Message
,但删除也是假删除,只是给Message
打上一个删除的Tag
。如果本地事务执行失败进行
rollback
,则直接删除这条Half Message
,但删除也是假删除。如果本地事务迟迟没有返回结果 (默认时间是6s),则会触发事务回查机制
- 执行回查之前需要校验检查次数是否到达了最大值(需要手动设置,没有默认值)。
- 或者是当前
Half Message
存在是否超过了Message
保存的上限,即 3天。- 如果满足上面条件中的一种
Half Message
会被放进TRANS_CHECK_MAX_TIME_TOPIC
Topic 当中。- 一旦判定为需要执行事务回查逻辑,那么当前这条
Half Message
就算已经被消费了。- 在没达到最大的校验次数之前,都还需要将其投递到事务队列当中,以便下次重试时再次执行
Check
逻辑。- 如果回查成功则删除投递的
Half Message
。
源码解读
发送事务消息调用的是TransactionMQProducer
的sendMessageInTransaction
方法:
主要有以下几个步骤:
获取事务监听器
TransactionListener
,如果获取为空或者本地事务执行器LocalTransactionExecuter
为空将抛出异常。因为需要通过
TransactionListener
或者LocalTransactionExecuter
来执行本地事务,所以不能为空。在消息中设置
prepared
属性,此时与普通消息(非事务消息)相比多了PROPERTY_TRANSACTION_PREPARED
属性。调用
send
方法发送prepared
消息也就是half
消息,发送消息的流程与普通消息一致。根据消息的发送结果判断:
- 如果发送成功执行本地事务,并返回本地事务执行结果状态,如果返回的执行状态结果为空,将本地事务状态设置为
UNKNOW
。- 发送成功之外的其他情况,包括
FLUSH_DISK_TIMEOUT
刷盘超时、FLUSH_SLAVE_TIMEOUT
和SLAVE_NOT_AVAILABLE
从节点不可用三种情况。- 此时意味着
half
消息发送失败,本地事务状态置为ROLLBACK_MESSAGE
回滚消息。调用
endTransaction
方法结束事务。
参考
《RocketMQ技术内幕》
https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
https://github.com/apache/rocketmq/blob/master/docs/cn/design.md