RocketMQ之六:RocketMQ消息存储
⼀、RocketMQ的消息存储基本介绍
先看⼀张图:
1、Commit log存储消息实体。顺序写,随机读。
2、Message queue存储消息的偏移量。读消息先读message queue,根据偏移量到commit log读消息本⾝。
3、索引队列⽤来存储消息的索引key
使⽤mmap⽅式减少内存拷贝,提⾼读取性能。具体实现:FileChannel.map(RandomAccessFile)
CommitLog以物理⽂件的⽅式存放,每台Broker上的CommitLog被本机器所有ConsumeQueue共享。
在CommitLog,⼀个消息的存储长度是不固定的,RocketMQ采⽤了⼀些机制,尽量向CommitLog中顺序写,但是随即读。
[ 磁盘存储的“快”——顺序写 ]
磁盘存储,使⽤得当,磁盘的速度完全可以匹配上⽹络的数据传输速度,⽬前的⾼性能磁盘,顺序写速度可以达到600MB/s,超过了⼀般⽹卡的传输速度。
[ 磁盘存储的“慢”——随机写 ]
磁盘的随机写的速度只有100KB/s,和顺序写的性能差了好⼏个数量级。
[ 存储机制这样设计的好处——顺序写,随机读 ]
1.CommitLog顺序写,可以⼤⼤提⾼写⼊的效率;
2.虽然是随机读,但是利⽤package机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。
3.为了保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue⾥只存储偏移量信息,所以尺⼨是有限的。在实际情况中,⼤部分ConsumeQueue能够被全部读⼊内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。
[ 如何保证CommitLog和ConsumeQueue的⼀致性? ]
CommitLog⾥存储了Consume Queues、Message Queue、Tag等所有信息,即使ConsumeQueue丢失,也可以通过commitLog完全恢复出来。
RocketMQ的Broker机器磁盘上的⽂件存储结构
1.1、RocketMQ的消息存储主要有如下概念:
(1)CommitLog:消息主体以及元数据的存储主体,存储Producer端写⼊的消息主体内容。单个⽂件⼤⼩默认1G ,⽂件名长度为20位,左边补零,剩余为起始偏移量,⽐如00000000000000000000代表了第⼀个⽂件,起始偏移量为0,⽂件⼤⼩为1G=1073741824;当第⼀个⽂件写满了,第⼆个⽂件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写⼊⽇志⽂件,当⽂件满了,写⼊下⼀个⽂件;
(2) ConsumeQueue:消息消费的逻辑队列,作为消费消息的索引,保存了指定Topic下的队列消息在
CommitLog中的起始物理偏移量offset,消息⼤⼩size和消息Tag的HashCode值。⽽IndexFile(索引⽂件)则只是为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法(ps:这种通过IndexFile来查消息的⽅法不影响发送与消费消息的主流程)。从实际物理存储来说,ConsumeQueue对应每个Topic和QueuId下⾯的⽂件。单个⽂件⼤⼩约5.72M,每个⽂件由30W条数据组成,每个⽂件默认⼤⼩为600万个字节,当⼀个ConsumeQueue类型的⽂件写满了,则写⼊下⼀个⽂件;
(3)IndexFile:因为所有的消息都存在CommitLog中,如果要实现根据 key 查询消息的⽅法,就会变得⾮常困难,所以为了解决这种业务需求,有了IndexFile的存在。⽤于为⽣成的索引⽂件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,⽂件名则是以创建时的时间戳命名的,固定的单个IndexFile⽂件⼤⼩约为
400M,⼀个IndexFile可以保存 2000W个索引;
(4)MapedFileQueue:对连续物理存储的抽象封装类,源码中可以通过消息存储的物理偏移量位置快速定位该offset所在MappedFile(具体物理存储位置的抽象)、创建、删除MappedFile等操作;
(5)MappedFile:⽂件存储的直接内存映射业务抽象封装类,源码中通过操作该类,可以把消息字节写⼊PageCache缓存区(commit),或者原⼦性地将消息持久化的刷盘(flush);
1.2、RocketMQ消息刷盘的主要过程
CommitLog写⼊:
MapedFileQueue 存储队列,数据定时删除,⽆限增长。
队列有多个⽂件(MapedFile)组成
当消息到达broker时,需要获取最新的MapedFile写⼊数据,调⽤MapedFileQueue的getLastMapedFile获取,此函数如果集合中⼀个也没有创建⼀个,如果最后⼀个写满了也创建⼀个新的。
MapedFileQueue在获取getLastMapedFile时,如果需要创建新的MapedFile会计算出下⼀个MapedFile⽂件地址,通过预分配服务AllocateMapedFileService异步预创建下⼀个MapedFile⽂件,这样下次创建新⽂件请求就不要等待,因为创建⽂件特别是⼀个1G的⽂件还是有点耗时的,
后续如果是异步刷盘还需要将mapedFile中的消息序列化到commitLog物理⽂件
consumeQueue写⼊:
也采⽤mappedFile⽂件内存映射。
底层也⽤与commitLog相同的MapedFileQueue数据结构。
consume queue中存储单元是⼀个20字节定长的数据,是顺序写顺序读
(1)      commitLogOffset是指这条消息在commitLog⽂件实际偏移量
(2)      size就是指消息⼤⼩
(3)消息tag的哈希值
在RocketMQ中消息刷盘主要可以分为同步刷盘和异步刷盘两种:
(1)同步刷盘:如上图所⽰,只有在消息真正持久化⾄磁盘后,RocketMQ的Broker端才会真正地返回给Producer端⼀个成功的ACK响应。同步刷盘对MQ消息可靠性来说是⼀种不错的保障,但是性能上会有较⼤影响,⼀般适⽤于⾦融业务应⽤领域。RocketMQ同步刷盘的⼤致做法是,基于⽣产者消费者模型,主线程创建刷盘请求实例—GroupCommitRequest并在放⼊刷盘写队列后唤醒同步刷盘线程—GroupCommitService,来执⾏刷盘动作(其中⽤了CAS变量和CountDownLatch来保证线程间的同步)。这
⾥,RocketMQ源码中⽤读写双缓存队列(requestsWrite/requestsRead)来实现读写分离,其带来的好处在于内部消费⽣成的同步刷盘请求可以不⽤加锁,提⾼并发度。
(2)异步刷盘:能够充分利⽤OS的PageCache的优势,只要消息写⼊PageCache即可将成功的ACK返回给Producer端。消息刷盘采⽤后台异步线程提交的⽅式进⾏,降低了读写延迟,提⾼了MQ的性能和吞吐量。异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程wakeup后,就会继续执⾏。
1.3、⼏个主要的组件说明
1.3.1、ConsumeQueue
consumeQueue是消息的逻辑队列,相当于字典的⽬录,⽤来指定消息在物理⽂件commitLog上的位置。其中包含了这个MessageQueue在CommitLog中的起始物理位置偏移量offset,消息实体内容的⼤⼩和Message Tag的哈希值。从实际物理存储来说,ConsumeQueue对应每个Topic和QueuId下⾯的⽂件。单个⽂件⼤⼩约5.72M,每个⽂件由30W条数据组成,每个⽂件默认⼤⼩为600万个字节,当⼀个ConsumeQueue类型的⽂件写满了,则写⼊下⼀个⽂件;
我们可以在配置中指定consumequeue与commitlog存储的⽬录
每个topic下的每个queue都有⼀个对应的consumequeue⽂件,⽐如:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue⽂件组织,如图所⽰:
Consume Queue⽂件组织⽰意图
1. 根据topic和queueId来组织⽂件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成⼀个ConsumeQueue,TopicA和QueueId=1组成另⼀个ConsumeQueue。
2. 按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,⽐如图中的%RETRY%ConsumerGroupA。
3. 按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,⽐如图中的%DLQ%ConsumerGroupA。
死信队列(Dead Letter Queue)⼀般⽤于存放由于某种原因⽆法传递的消息,⽐如处理失败或者已经过期的消息。
Consume Queue中存储单元是⼀个20字节定长的⼆进制数据,顺序写顺序读,如下图所⽰:
Queue单个存储单元结构
consume queue⽂件存储单元格式
1. CommitLog Offset是指这条消息在Commit Log⽂件中的实际偏移量
2. Size存储中消息的⼤⼩
3. Message Tag HashCode存储消息的Tag的哈希值:主要⽤于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查到订阅的消息)
1.3.2、Commit Log
CommitLog:消息主体以及元数据的存储主体,存储Producer端写⼊的消息主体内容。消息存放的物理⽂件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。⽂件的默认位置如下,仍然可通过配置⽂件修改:
${user.home} \store\${commitlog}\${fileName}
CommitLog的消息存储单元长度不固定,⽂件顺序写,随机读。消息的存储结构如下表所⽰,按照编号顺序以及编号对应的内容依次存储。
1.3.3、IndexFile消息的索引⽂件
IndexFile:⽤于为⽣成的索引⽂件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,⽂件名则是以创建时的时间戳命名的,固定的单个IndexFile ⽂件⼤⼩约为400M,⼀个IndexFile可以保存 2000W个索引;
indexFile存放的位置:${rocketmq.home}/store/index/indexFile(年⽉⽇时分秒等组成⽂件名)
Index⽣成过程
上⼀篇(www.jianshu/p/606d4b77d504)讲ConsumeQueue的时候,有⼀个ReputMessageService在分发消息的时候还会调⽤CommitLogDispatcherBuildIndex⽤来创建index。这个类实现就是直接调⽤的IndexService.buildIndex()
如果⼀个消息包含key值的话,会使⽤IndexFile存储消息索引,⽂件的内容结构如图:
消息索引
索引⽂件主要⽤于根据key来查询消息的,流程主要是:
1. 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是⼀个索引⽂件⾥⾯包含的最⼤槽的数⽬,例如图中所⽰ slotNum=5000000)
2. 根据 slotValue(slot 位置对应的值)查到索引项列表的最后⼀项(倒序排列,slotValue 总是指向最新的⼀个索引项)
3. 遍历索引项列表返回查询时间范围内的结果集(默认⼀次最⼤返回的 32 条记录)
⼆、RocketMQ的消息存储原理
消息存储是MQ消息队列中最为复杂和最为重要的⼀部分,本⽂先从⽬前⼏种⽐较常⽤的MQ消息队列存储⽅式出发,为⼤家介绍RocketMQ选择磁盘⽂件存储的原因。然后,本⽂分别从RocketMQ的消息存储整体架构和RocketMQ⽂件存储模型层次结构两⽅⾯进⾏深⼊分析介绍。使得⼤家读完本⽂后对RocketMQ消息存储部分有⼀个⼤致的了解和认识。
2.1、MQ消息队列的⼀般存储⽅式
当前业界⼏款主流的MQ消息队列采⽤的存储⽅式主要有以下三种⽅式:
(1)分布式KV存储:这类MQ⼀般会采⽤诸如levelDB、RocksDB和Redis来作为消息持久化的⽅式,由于分布式缓存的读写能⼒要优于DB,所以在对消息的读写能⼒要求都不是⽐较⾼的情况下,采⽤这种⽅式倒也不失为⼀种可以替代的设计⽅案。消息存储于分布式KV需要解决的问题在于如何保证MQ整体的可靠性?
(2)⽂件系统:⽬前业界较为常⽤的⼏款产品(RocketMQ/Kafka/RabbitMQ)均采⽤的是消息刷盘⾄所部署虚拟机/物理机的⽂件系统来做持久化(刷盘⼀般可以分为异步刷盘和同步刷盘两种模式)。⼩编认为,消息刷盘为消息存储提供了⼀种⾼效率、⾼可靠性和⾼性能的数据持久化⽅式。除⾮部署MQ机器本⾝或是本地磁盘挂了,否则⼀般是不会出现⽆法持久化的故障问题。
(3)关系型数据库DB:Apache下开源的另外⼀款MQ—ActiveMQ(默认采⽤的KahaDB做消息存储)可选⽤JDBC的⽅式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。因此,如果要选型或者⾃研⼀款性能强劲、吞吐量⼤、消息堆积能⼒突出的MQ消息队列,那么⼩编并不推荐采⽤关系型数据库作为消息持久化的⽅案。在可靠性⽅⾯,该种⽅案⾮常依赖DB,如果⼀旦DB出现故障,则MQ的消息就⽆法落盘存储会导致线上故障;
因此,综合上所述从存储效率来说,⽂件系统>分布式KV存储>关系型数据库DB,直接操作⽂件系统肯定是最快和最⾼效的,⽽关系型数据库TPS⼀般相⽐于分布式KV系统会更低⼀些(简略地说,关系型数据库本⾝也是⼀个需要读写⽂件server,这时MQ作为client与其建⽴连接并发送待持久化的消息数据,同时⼜需要依赖DB的事务等,这⼀系列操作都⽐较消耗性能),所以如果追求⾼效的IO读写,那么选择操作⽂件系统会更加合适⼀些。但是如果从易于实现和快速集成来看,关系型数据库DB>分布式KV存储>⽂件系统,但是性能会下降很多。
另外,从消息中间件的本⾝定义来考虑,应该尽量减少对于外部第三⽅中间件的依赖。⼀般来说依赖的外部系统越多,也会使得本⾝的设计越复杂,所以⼩编个⼈的理解是采⽤⽂件系统作为消息存储的⽅式,更贴近消息中间件本⾝的定义。
2.2、RocketMQ消息存储整体架构
消息存储实现,⽐较复杂,也值得⼤家深⼊了解,后⾯会单独成⽂来分析,这⼩节只以代码说明⼀下具体的流程。
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
/
/ Set the message body BODY CRC (consider the most appropriate setting
msg.Body()));
StoreStatsService storeStatsService = StoreStatsService();
synchronized (this) {
rabbitmq rocketmq kafka区别long beginLockTimestamp = SystemClock().now();
// Here settings are stored timestamp, in order to ensure an orderly global
msg.setStoreTimestamp(beginLockTimestamp);
// MapedFile:操作物理⽂件在内存中的映射以及将内存数据持久化到物理⽂件中
MapedFile mapedFile = LastMapedFile();
// 将Message追加到⽂件commitlog
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
switch (Status()) {
case PUT_OK:break;
case END_OF_FILE:
// Create a new file, re-write the message
mapedFile = LastMapedFile();
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;
DispatchRequest dispatchRequest = new DispatchRequest(
topic,// 1
queueId,// 2
tagsCode,// 5
/**
* Transaction
*/
// 1.分发消息位置到ConsumeQueue
/
/ 2.分发到IndexService建⽴索引
this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}
(1)RocketMQ消息存储结构类型及缺点
上图即为RocketMQ的消息存储整体架构,RocketMQ采⽤的是混合型的存储结构,即为Broker单个实例下所有的队列共⽤⼀个⽇志数据⽂件(即为CommitLog)来存储。⽽Kafka采⽤的是独⽴型的存储结构,每个队列⼀个⽂件。这⾥⼩编认为,RocketMQ采⽤混合型存储结构的缺点在于,会存在较多的随机读操作,因此读的效率偏低。同时消费消息需要依赖ConsumeQueue,构建该逻辑消费队列需要⼀定开销。