RocketMQ存储设计剖析

最近在内部准备一个消息中间件的实践分享,顺便整理下RocketMQ的存储设计。存储设计是整个Broker最为重要的部分之一,好比你着手了解一个业务系统的逻辑时,如果先了解其DB表结构设计,再去理解业务代码逻辑就容易的多了。本文通过本地实验及相关文档参考,整理其核心概念的存储设计。

存储流程简介

image-20211205163509696

首先结合官方文档的图片对消息存储的流程有个大致的了解:

  1. 所有生产者都会往Broker指定的Topic发消息,在Broker收到消息后,无论消息属于哪个Topic,都会封装成一个标准格式后追加存储到一个CommitLog文件上。

  2. 存到到CommitLog文件后Broker会将这条消息在CommitLog的物理位置追加存储到一个ConsumeQueue的文件上,每个Topic都有多个ConsumeQueue,默认Broker在追加存储时会轮询这个Topic下的所有ConsumeQueue文件。

通过以上了解可以得知,其实CommitLog是消息的物理存储,而ConsumeQueue是消息的逻辑存储,类似于索引文件。另外在RocketMQ还提供了消息Key查询的功能特性,其实现也是在消息持久化后生成的索引文件IndexFile,在上图没有所有说明。

存储目录结构

RocketMQ的每个Broker启动后,会创建相应的存储目录来存储消息,默认目录在~/store,以下为其存储层级结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
store
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   └── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerFilter.json.bak
│   ├── consumerOffset.json
│   ├── consumerOffset.json.bak
│   ├── delayOffset.json
│   ├── delayOffset.json.bak
│   ├── subscriptionGroup.json
│   ├── topics.json
│   └── topics.json.bak
├── consumequeue
│   └── TopicTest
│   ├── 0
│   │   └── 00000000000000000000
│   ├── 1
│   │   └── 00000000000000000000
│   ├── 2
│   │   └── 00000000000000000000
│   └── 3
│   └── 00000000000000000000
├── index
│   └── 20211204001412810
└── lock

CommitLog设计

CommitLog是一种常见的设计思想,相关内容可以通过Martin Fowler的Write-Ahead Log了解,RocketMQ的CommitLog设计在里面都有相应的体现,如:1)为了防止单个CommitLog文件过大,RocketMQ对CommitLog文件做了分段拆分,默认1个文件为1GB。2)为了防止CommitLog文件无限追加导致存储不足,RocketMQ默认值保留3天的数据,超期的CommitLog会被清理删除。

CommitLog每个文件都以字节的offset来命名,从0开始(固定长度,左边做0填充),如第2个文件名可以通过计算获得,依此类推。

1GB = 1 * 1024 * 1024 * 1024 Byte = 1073741824 Byte

1
2
3
4
➜  ll store/commitlog
total 376
-rw-r--r-- 1 nisiyong staff 1.0G Dec 4 00:14 00000000000000000000
-rw-r--r-- 1 nisiyong staff 1.0G Dec 4 00:14 00000000001073741824

每条消息的编码格式如下表格,并追加到对应的CommitLog文件上,通过表格可得知每条消息的其中的固定部分占90个字节。

顺序 字段名称 数据类型 字节数 字段说明
1 MsgLength Int 4 消息总长度
2 MagicCode Int 4 魔数,固定值0xdaa320a7
3 BodyCrc Int 4 消息内容CRC校验码
4 QueueId Int 4 消息的ConsumeQueue的ID
5 Flag Int 4 消息FLAG,RocketMQ不做处理,供应用程序使用
6 QueueOffset Long 8 消息在ConsumeQueue上的偏移量
7 PhysicalOffset Long 8 消息在CommitLog上的偏移量
8 SysFlag Int 4 消息系统FLAG,如是否压缩,是否为消息等
9 BornTimestamp Long 8 消息在客户端的生成的时间
10 BornHost Long 8 消息在客户端的IP:PORT
11 StoreTimestamp Long 8 消息在服务端Broker的存储时间
12 StoreHost Long 8 消息在服务端Broker的IP:PORT
13 ReconsumeTimes Int 4 消息的重试次数
14 PrepareTransactionOffset Long 8 事物消息的偏移量
15 BodyLength Int 4 消息体的长度
16 Body byte[] array size 消息内容
17 TopicLength byte 1 Topic的长度
18 Topic byte[] array size Topic名称
19 PropertiesLength byte 1 扩展属性长度
20 Properties byte[] array size 扩展属性内容

如下是本地磁盘的通过xxd命令获取的commitlog二进制文本信息,通过魔数daa320a7可以区分出每一条消息的大致位置。image-20211205171926534

ConsumeQueue设计

RocketMQ创建Topic时都会指定需要几个Queue,这些Queue会均衡的分配到各个Broker服务器上。Queue在Topic目录下,名称也类似commitlog按offset来命名。

ConsumeQueue文件主要存储消息的摘要信息,在commitlog之上多了一层逻辑层抽象,便于Topic隔离维护等

默认1个ConsumeQueue文件包含30w个条目,每个条目大小固定共20个字节,结构如下:

顺序 字段名称 数据类型 字节数
1 CommitLogOffset Long 8
2 MsgLength Int 4
3 TagHashCode Long 8

通过计算可得知1个ConsumeQueue文件写满后大小约5.7MB

1
2
3
➜  ll store/consumequeue/TopicTest/0
total 11720
-rw-r--r-- 1 nisiyong staff 5.7M Dec 4 00:14 00000000000000000000

IndexFile设计

RocketMQ为了方便消息检索,支持了用户在发送消息时设置自定的Key,消息在服务端根据Key进行索引构建,在后续的控制台可以通过该Key来查询该消息。

Broker端的IndexFile就是这些Key的索引文件,与上述的文件命名不同,IndexFile是更加时间戳来命名的,方便后续结合时间维度来查询。每个IndexFile由以下3部分组成:

  • IndexHeader,共40个字节
  • SlotTable,每个Slot占4个字节,存放消息Key的hashCode,默认1个IndexFile有500w个Slot
  • IndexItems,每个IndexItem占20个字节,默认1个IndexFile有2000w个IndexItem

通过计算可得知1个Index文件写满后大小约401MB

1
2
3
➜  ll store/index
total 39104
-rw-r--r-- 1 nisiyong staff 401M Dec 5 00:57 20211204001412810

固定40个字节的IndexHeader结构如下:

顺序 字段名称 数据类型 字节数 字段说明
1 BeginTimestamp Long 8 该索引文件内消息的最小存储时间
2 EndTimestamp Long 8 该索引文件内消息的最大存储时间
3 BeginPhyoffset Long 8 该索引文件内消息的最小CommitLog Offset
4 EndPhyoffset Long 8 该索引文件内消息的最大CommitLog Offset
5 HashSlotCount Int 4 该索引文件的Slot数量,默认500w个
6 Index Int 4 该索引文件的IndexItem数量,默认2000w个

每个IndexItem的结构如下:

顺序 字段名称 数据类型 字节数 字段说明
1 HashCode Int 4 消息Key字符串的HashCode
2 Phyoffset Long 8 消息的ComimitLog Offset
3 Timediff Int 4 与第一条消息的时间错差值,小于0该消息无效
4 PreIndexNo Int 4 上一条消息的IndexItem的索引,出现Hash碰撞时,形成链表结构

IndexFile存储过程简单说明下,结合下图重点关注以下几点:

  • 每条消息的Key都能计算出一个4个字节Int类型的HashCode,通过该HashCod%500w得到slot的位置
  • 每条消息的信息可以生成一个IndexItem,只要IndexItems有空间,就直接追加存放,并把该IndexItem的位置索引存放到对应的slot上
  • 当出现hash碰撞时,新的IndexItem需要记录上一个slot的IndexItem位置,然后用新的IndexItem的位置覆盖到slot的位置

image-20211205223634676

总结

至此RocketMQ消息在服务端的存储设计及相关数据结构已经介绍完毕,由于篇幅关系只介绍了消息生产写入的逻辑,而消费端如何消费读取消息,以及消费的进度位点没有详细介绍,这块大家可以结合store/config/consumerOffset.json文件内容进行了解,逻辑比较简单。总体来看,对存储的数据结构有较为清晰的了解对实际使用时帮助比较大,后续在学习一些复杂的中间件时先从总体关注其核心功能设计,再逐步去了解局部的功能逻辑。

References