RocketMQ源码之消息存储!

整体架构:

RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)。

针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构。

Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。

img

核心步骤:

1、首先,生产者根据Topic发送消息,消息存储在commitLog中,1G一个文件,当文件满了,写入下一个文件。

2、其次,ReputMessageService重写消息服务执行2个分发操作

  • 创建ConsumerQueue逻辑消费队列:
    • 参数:commitLogOffset :物理偏移量、msgSize:消息长度、tagsCode:tag哈希。
  • 创建IndexFile索引文件:
    • 以创建时的时间戳命名。

3、最后,消费者根据TopicTag拉取消息消费,根据key查询消息。

重要文件:

commitLog消息日志:

  • 消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容。

consumequeue逻辑消费队列:

  • 存储了commitLog的起始物理offset,目的是提高消息消费的性能。

indexFile索引文件:

  • 提供了一种可以通过key或时间区间来查询消息的方法。

consumequeue文件:

consumequeue文件采取定长设计,每一个条目共20个字节。

  • 分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成。

可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

  • 默认一个topic对应4个queueId,即4个messageQueue。

每个messageQueue文件夹下有多个consumeQueue,所以:messageQueue 1:N consumeQueue

img