RocketMQ存储设计剖析
最近在内部准备一个消息中间件的实践分享,顺便整理下RocketMQ的存储设计。存储设计是整个Broker最为重要的部分之一,好比你着手了解一个业务系统的逻辑时,如果先了解其DB表结构设计,再去理解业务代码逻辑就容易的多了。本文通过本地实验及相关文档参考,整理其核心概念的存储设计。
存储流程简介
首先结合官方文档的图片对消息存储的流程有个大致的了解:
所有生产者都会往Broker指定的Topic发消息,在Broker收到消息后,无论消息属于哪个Topic,都会封装成一个标准格式后追加存储到一个
CommitLog
文件上。存到到CommitLog文件后Broker会将这条消息在
CommitLog
的物理位置追加存储到一个ConsumeQueue
的文件上,每个Topic都有多个ConsumeQueue
,默认Broker在追加存储时会轮询这个Topic下的所有ConsumeQueue
文件。
通过以上了解可以得知,其实CommitLog是消息的物理存储,而ConsumeQueue是消息的逻辑存储,类似于索引文件。另外在RocketMQ还提供了消息Key查询的功能特性,其实现也是在消息持久化后生成的索引文件IndexFile,在上图没有所有说明。
存储目录结构
RocketMQ的每个Broker启动后,会创建相应的存储目录来存储消息,默认目录在~/store
,以下为其存储层级结构:
1 | store |
CommitLog设计
CommitLog是一种常见的设计思想,相关内容可以通过Martin Fowler的Write-Ahead Log了解,RocketMQ的CommitLog设计在里面都有相应的体现,如:1)为了防止单个CommitLog文件过大,RocketMQ对CommitLog文件做了分段拆分,默认1个文件为1GB。2)为了防止CommitLog文件无限追加导致存储不足,RocketMQ默认值保留3天的数据,超期的CommitLog会被清理删除。
CommitLog每个文件都以字节的offset来命名,从0开始(固定长度,左边做0填充),如第2个文件名可以通过计算获得,依此类推。
1GB = 1 * 1024 * 1024 * 1024 Byte = 1073741824 Byte
1 | ➜ ll store/commitlog |
每条消息的编码格式如下表格,并追加到对应的CommitLog文件上,通过表格可得知每条消息的其中的固定部分占90个字节。
顺序 | 字段名称 | 数据类型 | 字节数 | 字段说明 |
---|---|---|---|---|
1 | MsgLength | Int | 4 | 消息总长度 |
2 | MagicCode | Int | 4 | 魔数,固定值0xdaa320a7 |
3 | BodyCrc | Int | 4 | 消息内容CRC校验码 |
4 | QueueId | Int | 4 | 消息的ConsumeQueue的ID |
5 | Flag | Int | 4 | 消息FLAG,RocketMQ不做处理,供应用程序使用 |
6 | QueueOffset | Long | 8 | 消息在ConsumeQueue上的偏移量 |
7 | PhysicalOffset | Long | 8 | 消息在CommitLog上的偏移量 |
8 | SysFlag | Int | 4 | 消息系统FLAG,如是否压缩,是否为消息等 |
9 | BornTimestamp | Long | 8 | 消息在客户端的生成的时间 |
10 | BornHost | Long | 8 | 消息在客户端的IP:PORT |
11 | StoreTimestamp | Long | 8 | 消息在服务端Broker的存储时间 |
12 | StoreHost | Long | 8 | 消息在服务端Broker的IP:PORT |
13 | ReconsumeTimes | Int | 4 | 消息的重试次数 |
14 | PrepareTransactionOffset | Long | 8 | 事物消息的偏移量 |
15 | BodyLength | Int | 4 | 消息体的长度 |
16 | Body | byte[] | array size | 消息内容 |
17 | TopicLength | byte | 1 | Topic的长度 |
18 | Topic | byte[] | array size | Topic名称 |
19 | PropertiesLength | byte | 1 | 扩展属性长度 |
20 | Properties | byte[] | array size | 扩展属性内容 |
如下是本地磁盘的通过xxd
命令获取的commitlog二进制文本信息,通过魔数daa320a7
可以区分出每一条消息的大致位置。
ConsumeQueue设计
RocketMQ创建Topic时都会指定需要几个Queue,这些Queue会均衡的分配到各个Broker服务器上。Queue在Topic目录下,名称也类似commitlog按offset来命名。
ConsumeQueue文件主要存储消息的摘要信息,在commitlog之上多了一层逻辑层抽象,便于Topic隔离维护等
默认1个ConsumeQueue文件包含30w个条目,每个条目大小固定共20个字节,结构如下:
顺序 | 字段名称 | 数据类型 | 字节数 |
---|---|---|---|
1 | CommitLogOffset | Long | 8 |
2 | MsgLength | Int | 4 |
3 | TagHashCode | Long | 8 |
通过计算可得知1个ConsumeQueue文件写满后大小约5.7MB
1 | ➜ ll store/consumequeue/TopicTest/0 |
IndexFile设计
RocketMQ为了方便消息检索,支持了用户在发送消息时设置自定的Key,消息在服务端根据Key进行索引构建,在后续的控制台可以通过该Key来查询该消息。
Broker端的IndexFile就是这些Key的索引文件,与上述的文件命名不同,IndexFile是更加时间戳来命名的,方便后续结合时间维度来查询。每个IndexFile由以下3部分组成:
- IndexHeader,共40个字节
- SlotTable,每个Slot占4个字节,存放消息Key的hashCode,默认1个IndexFile有500w个Slot
- IndexItems,每个IndexItem占20个字节,默认1个IndexFile有2000w个IndexItem
通过计算可得知1个Index文件写满后大小约401MB
1 | ➜ ll store/index |
固定40个字节的IndexHeader结构如下:
顺序 | 字段名称 | 数据类型 | 字节数 | 字段说明 |
---|---|---|---|---|
1 | BeginTimestamp | Long | 8 | 该索引文件内消息的最小存储时间 |
2 | EndTimestamp | Long | 8 | 该索引文件内消息的最大存储时间 |
3 | BeginPhyoffset | Long | 8 | 该索引文件内消息的最小CommitLog Offset |
4 | EndPhyoffset | Long | 8 | 该索引文件内消息的最大CommitLog Offset |
5 | HashSlotCount | Int | 4 | 该索引文件的Slot数量,默认500w个 |
6 | Index | Int | 4 | 该索引文件的IndexItem数量,默认2000w个 |
每个IndexItem的结构如下:
顺序 | 字段名称 | 数据类型 | 字节数 | 字段说明 |
---|---|---|---|---|
1 | HashCode | Int | 4 | 消息Key字符串的HashCode |
2 | Phyoffset | Long | 8 | 消息的ComimitLog Offset |
3 | Timediff | Int | 4 | 与第一条消息的时间错差值,小于0该消息无效 |
4 | PreIndexNo | Int | 4 | 上一条消息的IndexItem的索引,出现Hash碰撞时,形成链表结构 |
IndexFile存储过程简单说明下,结合下图重点关注以下几点:
- 每条消息的Key都能计算出一个4个字节Int类型的HashCode,通过该HashCod%500w得到slot的位置
- 每条消息的信息可以生成一个IndexItem,只要IndexItems有空间,就直接追加存放,并把该IndexItem的位置索引存放到对应的slot上
- 当出现hash碰撞时,新的IndexItem需要记录上一个slot的IndexItem位置,然后用新的IndexItem的位置覆盖到slot的位置
总结
至此RocketMQ消息在服务端的存储设计及相关数据结构已经介绍完毕,由于篇幅关系只介绍了消息生产写入的逻辑,而消费端如何消费读取消息,以及消费的进度位点没有详细介绍,这块大家可以结合store/config/consumerOffset.json
文件内容进行了解,逻辑比较简单。总体来看,对存储的数据结构有较为清晰的了解对实际使用时帮助比较大,后续在学习一些复杂的中间件时先从总体关注其核心功能设计,再逐步去了解局部的功能逻辑。