RocketMQ 实现了灵巧的多分区和多副本机制,有效的避免了集群内单点故障对付整体做事可用性的影响。存储机制和高可用策略是 RocketMQ 稳定性的核心,社区上关于 RocketMQ 目前存储实现的剖析与谈论一贯是一个热议的话题。本文想从一个不一样的视角,着重于谈谈我眼中的这种存储实现是在办理哪些繁芜的问题,因此我从本文最初的版本中删去了冗杂的代码细节剖析,由浅入深的剖析存储机制的毛病与优化方向。
RocketMQ 的架构模型与存储分类先来大略先容下 RocketMQ 的架构模型。RocketMQ 是一个范例的发布订阅系统,通过 Broker 节点中转和持久化数据,解耦高下游。Broker 是真实存储数据的节点,由多个水平支配但不一定完备对等的副本组构成,单个副本组的不同节点的数据会达到终极同等。对付单个副本组来说同一韶光最多只会有一个可读写的 Master 和多少个只读的 Slave,主故障时须要选举来进行单点故障的容错,此时这个副本组是可读不可写的。NameServer 是独立的一个无状态组件,接管 Broker 的元数据注册并动态掩护着一些映射关系,同时为客户端供应做事创造的能力。在这个模型中,我们利用不同主题 (Topic) 来区分不同种别信息流,为消费者设置订阅组 (Group) 进行更好的管理与负载均衡。
如下图中间部分所示:

RocketMQ 目前的存储实现可以分为几个部分:
1、元数据管理
详细指当前存储节点的主题 Topic,订阅组 Group,消费进度 ConsumerOffset。多个配置文件 Config,以及为了故障规复的存储 Checkpoint 和 FileLock。用来记录副本主备身份的 Epoch / SN (sequence number) 文件等(5.0-beta 引入,也可以看作 term)2、数据管理,包括存储的文件 CommitLog,文件版定时的 TimerLog。
3、索引数据管理,包括按行列步队的顺序索引 ConsumeQueue 和随机索引 IndexFile。
1. 元数据管理与优化
为了提升整体的吞吐量与供应跨副本组的高可用能力,RocketMQ 做事端一样平常会为单个 Topic 创建多个逻辑分区,即在多个副本组上各自掩护部分分区 (Partition),我们把它称为行列步队 (MessageQueue)。同一个副本组上同一个 Topic 的行列步队数相同并从 0 开始连续编号,不同副本组上的 MessageQueue 数量可以不同。
例如 topic-a 可以在 broker-1 主副本上有 4 个行列步队,编号 (queueId) 是 0-3,在 broker-1 备副本上完备相同,但是 broker-2 上可能就只有 2 个行列步队,编号 0-1。在 Broker 上元数据的组织管理办法是与上述模型匹配的,每一个 Topic 的 TopicConfig,包含了几个核心的属性,名称,读写行列步队数,权限与许多元数据标识,这个模型类似于 K8s 的 StatefulSet,行列步队从 0 开始编号,扩缩行列步队都在尾部操作(例如 24 个行列步队缩分区到 16,是留下了编号为 0-15 的分区)。这使得我们无需像 Kafka 一样对每个分区单独掩护状态机,同时大幅度的简化了关于分区的实现。
我们会在存储节点的内存中大略的掩护 Map<String, TopicConfig> 的构造来将 TopicName 直接映射到它的详细参数。这个设计足够的大略,也隐含了一些毛病,例如它没有实现一个原生 Namespace 机制来实现存储层面上多租户环境下的元数据的隔离,这也是 RocketMQ 5.0 向云原生时期迈进过程中一个主要的演进方向。
当 Broker 吸收到外部管控命令,例如创建或删除一些 Topic,这个内存 Map 中就会对应的更新或者删除一个 KV 对,须要急速序列化一次并向磁盘覆盖,否则就会造成丢失更新。对付单租户的场景下,Topic (Key) 的数量不会超过几千个,文件大小也只有数百 KB,速率是非常快。但是在云上大多租的场景下,一个存储节点的 Topic 可以达到十几 MB。每次变更一个 KV 就全量向磁盘覆盖写这个大文件,这个操作的开销非常高,尤其是在数据须要跨集群,跨节点迁移,或者应急情形下扩容逃生场景下,同步写文件严重延长了外围管控命令的相应韶光,也成为云上大共享模式下严厉的寻衅之一。在这个背景下,两个办理方案很自然的就产生了,即批量更新接口和增量更新机制。
批量更新指每次做事端可以接管一批 TopicConfig 的更新,这样 Broker 刷写文件的频率就显著的降落。增量更新指将这个 Map 的持久化换成逻辑更换成 KV 型的数据库或实现元数据的 Append 写,以 Compaction 的形式掩护同等性。除了最主要的 Topic 信息,Broker 还管理着 Group 信息,消费组的消费进度 ConsumerOffset 和多个配置文件。Group 的变更和 Topic 类似,都是只有新建或者删除时才须要持久化。而 ConsumeOffset 是用来掩护每个订阅组的消费进度的,构造如 Map<String/ topicName@groupId /, Map>。这里我们从文件本身的浸染和数据构造的角度进行剖析下,Topic Group 虽然数量多,但是变革的频率还是比较低的,而提交与持久化位点时时刻刻都在进行,进而导致这个 Map 险些在实时更新,但是上一次更新后的数据 (last commit offset) 对当前来说又没有什么用,并且许可丢少量更新。以是这里 RocketMQ 没有像 Topic Group 那样采纳数据变革时刷写文件,而是利用一个定时任务对这个 Map 做 CheckPoint。这个周期默认是 5 秒,以是当做事端主备切换或者正常发布时,都会有秒级的重复。
那么这里还有没有优化的空间呢?事实上大部分的订阅组都是不在线的,每次我们也只须要更新位点有变革的这部分订阅组。以是这里我们可以采纳一个差分优化的策略(参加过 ACM 的选手该当更熟习,搜索差分数据传输),在主备同步 Offset 或者持久化的时候只更新变革的内容。如果斯时我们除了知道当前的 Offset,还须要一个历史 Offset 的提交记录怎么办,这种情形下,也利用一个内置的系统 Topic 来保存每次提交(某种意义上的自举实现,Kafka 便是利用一个内部 Topic 来保存位点),通过回放或查找来追溯消费进度。由于 RocketMQ 支持海量 Topic,元数据的规模会更加大,采取目前的实现开销更小。以是选用哪种实现完备是由我们所面对的需求决定的,实现也可以是灵巧多变的。当然,在 RocketMQ 元数据管理上,如何在上层担保分布式环境下多个副本组上的数据同等又是其余一个令人头疼的难题,后续文章会更加详细的谈论这点。
数据管理很多文章都提到 RocketMQ 存储的核心是一个极致优化的顺序写盘,以 append only 的形式不断的将新的追加到文件末端。
RocketMQ 利用了一种称为 MappedByteBuffer 的内存映射文件的办法,将一个文件映射到进程的地址空间,实现文件的磁盘地址和进程的一段虚拟地址关联,实际上是利用了NIO 中的 FileChannel 模型。在进行这种绑定后,用户进程就可以用指针(偏移量)的形式写入磁盘而不用进行 read / write 的系统调用,减少了数据在缓冲区之间来回拷贝的开销。当然这种内核实现的机制有一些限定,单个 mmap 的文件不能太大 (RocketMQ 选择了 1G),此时再把多个 mmap 的文件用一个链表串起来构成一个逻辑行列步队 (称为 MappedFileQueue),就可以在逻辑上实现一个无需考虑长度的存储空间来保存全部的。
这里不同 Topic 的直接进行稠浊的 append only 写,比较于随机写来说性能的提升非常显著的。还有一个主要的细节,这里的稠浊写的写放大非常低。当我们转头去看 Google 实现的 BigTable 的理论模型,各种 LSM 树及其变种,都是将原来的直接掩护树转为增量写的办法来担保写性能,再叠加周期性的异步合并来减少文件的个数,这个动作也称为 Compaction。RocksDB 和 LevelDB 在写放大,读放大,空间放大都有几倍到几十倍的开销。得益于本身的不可变性,和非堆积的场景下,数据一旦写入中间代理 Broker 很快就会被下贱消费掉的特性,此时我们不须要在写入时就掩护 memTable,避免了数据的分发与重修。比较于各种数据库的存储引擎,这样近似 FIFO 的实现可以节省大量的资源,同时减少了 CheckPoint 的繁芜度。对付同一个副本组上的多个副本之间的数据复制都是全部由存储层自行管理,这个设计类似于 bigtable 和 GFS,azure 的 Partation layer,也被称为 Layered Replication 分层架构。
2.1 单条的存储格式
RocketMQ有一套相对繁芜的存储编码用来将工具序列化,随后再将一个非定长的数据落到上述的真实的写入到文件中,值得把稳的存储格式中包括了索引行列步队的编号和位置。
存储时单条本身元数据占用的存储空间为固定的 91B + 部分属性,而的 payload 常日大于 2K,也便是说元数据带来的额外存储开销只增加了 5%-10% 旁边。很明显,单条越大,存储本身额外的开销(比例)就相对的越少。但如果有大的诉求,例如想在 body 中保存一张序列化后的图片(二进制大工具),从目前的实现上说,在中保存引用,将真实数据保存到到其他组件,消费时读取引用(比如文件名或者 uk)实在是一个更得当的设计。
2.2 多条的连续写
上文提到,不同 Topic 的数据是直接稠浊追加数据到 CommitLog 中 (也便是上文提到的 MappedFileQueue),再交由其他后端线程做分发。实在我以为 RocketMQ 这种 CommitLog 与元数据的分开管理的机制也有一些 PacificaA (微软提出的复制框架) 的影子,从而以一种更大略的办法实现强同等。这里的强同等指的是在 Master Broker (对应于 PacificA 的 Primary) 对所有的持久化进行定序,再通过全序广播 (total order broadcast) 实现线性同等 (Linearizability)。这几种实现都会须要办理两个类似的问题,一是如何实现单机下的顺序写,二是如何加快写入的速率。
如果是副本组是异步多写的(高性能中可靠性),将日志非最新(水位最高)的备选为主,主备的数据日志可能会产生分叉。在 RocketMQ 5.0 中,主备会通过基于版本的协商机制,利用掉队补齐,截断未提交数据等办法来担保数据的同等性。顺便一提,RocketMQ 5.0 中实现了 logic queue 方案办理全局分区数变革的问题,这和 PacificaA 中通过 new-seal 新增副本组和分片 merge 给打算层读的一些优化策略有一些异曲同工之妙,详细可以参考这个设计方案[10]。
2.2.1 独占锁实现顺序写
如何担保单机存储写 CommitLog 的顺序性,直不雅观的想法便是对写入动作加独占锁保护,即同一时候只许可一个线程加锁成功,那么该选什么样的锁实现才得当呢?RocketMQ 目前实现了两种办法。1. 基于 AQS 的 ReentrantLock 2. 基于 CAS 的 SpinLock。
那么什么时候选取 spinlock,什么时候选取 reentranlock?回顾下两种锁的实现,对付 ReentrantLock,底层 AQS 抢不到锁的话会休眠,但是 SpinLock 会一贯抢锁,造成明显的 CPU 占用。SpinLock 在 trylock 失落败时,可以预期持有锁的线程会很快退出临界区,去世循环的忙等待很可能要比进程挂起等待更高效。这也是为什么在高并发下为了保持 CPU 平稳占用而采取办法一,单次要求相应韶光短的场景下采取办法二能够减少 CPU 开销。两种实实际用锁内操作韶光不同的场景,那线程拿到锁之后须要进行哪些动作呢?
估量算索引的位置,即 ConsumeQueueOffset,这个值也须要担保严格递增。打算在 CommitLog 存储的位置,physicalOffset 物理偏移量,也便是全局文件的位置。记录存储韶光戳 storeTimestamp,紧张是为了担保投递的韶光严格保序。因此不少文章也会建议在同步持久化的时候采取 ReentrantLock,异步持久化的时候采取 SpinLock。那么这个地方还有没有优化的空间?目前可以考虑利用较新的 futex 取代 spinlock 机制。futex 掩护了一个内核层的等待行列步队和许多个 SpinLock 链表。当得到锁时,考试测验 cas 修正,如果成功则得到锁,否则就将当前哨程 uaddr hash 放入到等待行列步队 (wait queue),分散对等待行列步队的竞争,减小单个行列步队的长度。这听起来是不是也有一点点 concurrentHashMap 和 LongAddr 的味道,实在核心思想都是类似的,即分散竞争。
2.2.2 成组提交与可见性
受限于磁盘IO,块存储的相应常日非常慢。哀求所有要求立即持久化是不可能的,为了提升性能,大部分的系统总是将操作日志缓存到内存中,比如在知足"日志缓冲区中数据量超过一定大小 / 间隔上次刷入磁盘超过一定韶光" 的任一条件时,通过后台线程定期持久化操作日志。但这种成组提交的做法有一个很大的问题,存储系统意外故障时,会丢失末了一部分更新操作。例如数据库引擎总是哀求先将操作日志刷入磁盘 (优先写入 redo log) 才能更新内存中的数据,这样断电重启则可以通过 undo log 进行事务回滚与丢弃。
在系统的实现上有一些奇妙的不同,不同场景下对的可靠性哀求不同,在金融云场景下可能哀求主备都同步持久化完成才对下贱可见,但日志场景希望尽可能低的延迟,同时许可故障场景少量丢失。此时可以将 RocketMQ 配置为单主异步持久化来提高性能,降落本钱。此时宕机,存储层会丢失末了一小段没保存的,而下贱的消费者实际上已经收到了。当下贱的消费者重置位点到一个更早的韶光,回放至同样位点的时候,只能读取到了新写入的,但读取不到之前消费过的(相同位点的不是同一条),这是一种 read uncommitted。
这样会有什么问题呢?对付普通来说,由于这条已经被下贱处理,最坏的影响是重置位点时无法消费到。但是对付 Flink 这样的流打算框架,以 RocketMQ 作为 Source 的时候,通过回放最近一次 CheckPoint 到当前的数据的 offset 来实现高可用,不可重复读会造成打算系统没法做到精确的 excatly once 消费,打算的结果也就禁绝确了。相应的办理的方案之一是在副本组多数派确认的时候才构建被消费者可见的索引,这么做宏不雅观上的影响便是写入的延迟增加了,这也可以从另一个角度解读为隔离级别的提升带来的代价。
对付权衡延迟和吞吐量这个问题,可以通过加快主备复制速率,改变复制的协议等手段来优化,这里大家可以看下 SIGMOD 2022 关于 Kafka 运行在 RDMA 网络上显著降落延迟的论文《KafkaDirect: Zero-copy Data Access for Apache Kafka over RDMA Networks》链接[9]。
2.3 持久化机制
关于这一块的谈论在社区里谈论是最多的,不少文章都把持久化机制称为刷盘。我不喜好这个词,由于它不准确。在 RocketMQ 中供应了三种办法来持久化,对应了三个不同的线程实现,实际利用中只会选择一个。
同步持久化,利用 GroupCommitService。异步持久化且未开启 TransientStorePool 缓存,利用 FlushRealTimeService。异步持久化且开启 TransientStorePool 缓存,利用 CommitRealService。2.3.1 持久化
同步刷盘的落盘线程统一都是 GroupCommitService。写入线程仅仅卖力唤醒落盘线程,将转交给存储线程,而不会等待存储完成之后就急速返回了。我个人对这个设计的理解是,写入线程相对与存储线程来说也可以看作 IO 线程,而真实存储的线程须要攒批持久化会陷入中断,以是才要大费周章的做转交。
从同步刷盘的实现看,落盘线程每隔 10 ms 会检讨一次,如果有数据未持久化,便将 page cache 中的数据刷入磁盘。此时操作系统 crash 或者断电,那未落盘的数据丢失会不会对生产者有影响呢?此时生产者只要利用了可靠发送 (指非 oneway 的 rpc 调用),这时对付发送者来说还没有收到成功的相应,此时客户端会进行重试,将写入其他可用的节点。
异步持久化对应的线程是 FlushRealTimeService,实现上又分为固定频率和非固定频率,核心差异是线程是否相应中断。所谓的固定频率是指每次有新的到来的时候不管,不相应中断,每隔 500ms(可配置)flush 一次,如果创造未落盘数据不敷(默认 16K),直接进入下一个循环,如果数据写入量很少,一贯没有添补斥16K,就不会落盘了吗?这里还有一个基于韶光的兜底方案,即线程创造间隔上次写入已经良久了(默认 10 秒),也会实行一次 flush。但事实上 FileChannel 还是 MappedByteBuffer 的 force() 方法都不能精确掌握写入的数据量,这里的写行为也只是对内核的一种建议。对付非固定频率实现,即每次有新的到来的时候,都会发送唤醒旗子暗记,当唤醒动作在数据量较大时,存在性能损耗,但量较少且情形下实时性好,更省资源。在生产中,详细选择哪种持久化实现由详细的场景决定。是同步写还是多副本异步写来担保数据存储的可靠性,实质上是读写延迟和和本钱之间的权衡。
2.3.2 读写分离
广义上来说,读写分离这个名词有两个不同的含义:
像数据库一样主写从读,分摊读压力,捐躯延迟可靠性更高,适用于读写比非常高的场景。
存储写入将暂存至 DirectByteBuffer,当数据成功写入后,再归还给缓冲池,将写入 page cache 的动作异步化。
这里紧张来谈论第二点,当 Broker 配置异步持久化且开启缓冲池,启用的异步刷盘线程是 CommitRealTimeService。我们知道操作系统本身一样平常是当 page cache 上积累了大量脏页后才会触发一次 flush 动作(由一些 vm 参数掌握,比如 dirty_background_ratio 和 dirty_ratio)。这里有一个很故意思的说法是 CPU 的 cache 是由硬件掩护同等性,而 page cache 须要由软件来掩护,也被称为 syncable。高并发下写入 page cache 可能会造成刷脏页时磁盘压力较高,导致写入时涌现毛刺征象。为理解决这个问题,涌现了读写分离的实现,利用堆外内存将 hold 住,然后进行异步批量写入。
RocketMQ 启动时会默认初始化 5 块(参数 transientStorePoolSize 决定)堆外内存(DirectByteBuffer)循环利用,由于复用堆外内存,这个小方案也被成为池化,池化的好处及弊端如下:
好处:数据写堆外后便很快返回,减少了用户态与内核态的切换开销。弊端:数据可靠性降为最低级别,进程重启就会丢数据(当然这里一样平常合营多副本机制进行保障),也会增加一些端到真个延迟。2.3.3 宕机与故障规复
宕机一样平常是由于底层的硬件问题导致,RocketMQ 宕机后如果磁盘没有永久故障,一样平常只须要原地重启,Broker 首先会进行存储状态的规复,加载 CommitLog,ConsumeQueue 到内存,完成 HA 协商,末了初始化 Netty Server 供应做事。目前的实现是末了初始化对用户可见的网络层做事,实际上这里也可以先初始化网络库,分批将 Topic 注册到 NameServer,这样正常升级时可以对用户的影响更小。
在 recover 的过程中还有很多软件工程实现上的细节,比如从块设备加载的时候须要校验的 crc 看是否产生缺点,对末了一小段未确认的进行 dispatch 等操作。默认从倒数第三个文件 recover CommitLog 加载到 page cache (假设未持久化的数据 < 3G),防止一上线由于客户端要求的不在内存,导致猖獗的缺页中断壅塞线程。分布式场景下还须要对存储的数据掩护同等性,这也就涉及到日志的截断,同步和回发等问题,后续我将在高可用篇再详细谈论这一点。
2.4 文件的生命周期
聊完了的生产保存,再来谈论下的生命周期,只要磁盘没有满,可以长期保存。前面提到 RocketMQ 将稠浊保存在 CommitLog,对付和流这样近似 FIFO 的系统来说,越近期的代价越高,以是默认以滚动的形式从前向后删除最久远的,而不会关注文件上的是否全部被消费。触发文件打消操作的是一个定时任务,默认每 10s 实行一次。在一次定时任务触发时,可能会有多个物理文件超过过期韶光可被删除,因此删除一个文件不但要判断这个文件是否还被利用,还须要间隔一定韶光(参数 deletePhysicFilesInterval)再删除其余一个文件,由于删除文件是一个非常耗费 IO 的操作,可能会引起存储抖动,导致新写入和消费的延迟。以是又新增了一个定时删除的能力,利用 deleteWhen 配置操作韶光(默认是凌晨4点)。我们把由于磁盘空间不敷导致的删除称为被动行为,由于高速介质常日比较贵(傲腾 ESSD等),出于本钱考虑,我们还会异步的主动的将热数据转移到二级介质上。在一些分外的场景下,删除的同时可能还须要对磁盘做安全擦除来防止数据规复。
2.5 避免存储抖动
2.5.1 快速失落败
被做事端 Netty 的 IO 线程读取后就会进入到壅塞行列步队中排队,而单个 Broker 节点有时会由于 GC,IO 抖动等成分造成短时存储写失落败。如果要求来不及处理,排队的要求就会越积越多导致 OOM,客户端视角看从发送到收到做事端相应的韶光大大延长,终极发送超时。RocketMQ 为了缓解这种抖动问题,引入了快速失落败机制,即开启一个扫描线程,不断的去检讨行列步队中的第一个排队节点,如果该节点的排队韶光已经超过了 200ms,就会拿出这个要求,立即向客户端返回失落败,客户端会重试到其他副本组(客户端还有一些熔断与隔离机制),实现整体做事的高可用。
存储系统不止是被动的感知一些下层缘故原由导致的失落败,RocketMQ 还设计了很多大略有效的算法来进行主动估算。例如写入时 RocketMQ 想要判断操作系统的 page cache 是否繁忙,但是 JVM 本身没有供应这样的 Monitor 工具来评估 page cache 繁忙程度,于是利用系统的处理韶光来判断写入是否超过1秒,如果超时的话,让新要求会快速失落败。再比如客户端消费时会判断当前主的内存利用率比较高,大于物理内存的 40%时,就会建议客户端从备机拉取消息。
2.5.2 预分配与文件预热为了在 CommitLog 写满之后快速的切换物理文件,后台利用一个后台线程异步创建新的文件并进行对进行内存锁定,还大费周章的设计了一个额外文件预热开关(配置 warmMapedFileEnable),这么做紧张有两个缘故原由:
要求分配内存并进行 mlock 系统调用后并不一定会为进程完备锁定这些物理内存,此时的内存分页可能是写时复制的。此时须要向每个内存页中写入一些假的值,有些固态的主控可能会对数据压缩,以是这里不会写入 0。
调用 mmap 进行映射后,OS 只是建立虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。这里可能会有大量缺页中断。RocketMQ 在做 mmap 内存映射的同时进行 madvise 调用,同时向 OS 表明 WILLNEED 的意愿。使 OS 做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。
当然,这么做也是有弊端的。预热后,写文件的耗时缩短了很多,但预热本身就会带来一些写放大。整体来看,这么做能在一定程度长进步相应韶光的稳定性,减少毛刺征象,但在 IO 本身压力很高的情形下则不建议开启。RocketMQ 是适用于 Topic 数量较多的业务场景。以是 RocketMQ 采取了和 Kafka 不一样的零拷贝方案,Kafka 采取的是壅塞式 IO 进行 sendfile,适用于系统日志这种高吞吐量的大块文件。而 RocketMQ 选择了 mmap + write 非壅塞式 IO (基于多路复用) 作为零拷贝办法,这是由于 RocketMQ 定位于业务级这种小数据块/高频率的 IO 传输,当想要更低的延迟的时候选择 mmap 更得当。
当 kernal 把可用的内存分配后 free 的内存就不足了,如果进程一下产生大量的新分配需求或者缺页中断,还须要将通过淘汰算法进行内存回收,此时可能会产生抖动,写入会有短时的毛刺征象。
2.5.3 冷数据读取
对付 RocketMQ 来说,读取冷数据可能有两种情形。
要求来自于这个副本组的其他节点,进行副本组内的数据复制,也可能是离线转储到其他系统。
要求来自于客户端,是消费者来消费几个小时以前的数据,属于正常的业务诉求。
对付第一种情形,在 RocketMQ 低版本源码中,对付须要大量复制 CommitLog 的情形(例如备磁盘故障,或新上线一个备机),主默认利用 DMA 拷贝的形式将数据直接通过网络复制给备机,此时由于大量的缺页中断壅塞了 io 线程,此时会影响 Netty 处理新的要求,在实现上让一些组件之间的内部通信利用 fastRemoting 供应的第二个端口,办理这个问题的临时方案还包括先用业务线程将数据 load 回内存而不该用零拷贝,但这个做法没有从实质上办理壅塞的问题。对付冷拷贝的情形,可以利用 madvice 建议 os 读取避免影响主的写入,也可以从其他备复制数据。
对付第二种情形,对各个存储产品来说都是一个寻衅,客户端消费一条时,热数据全部存储在 page cache,对付冷数据会退化为随机读(系统会有一个对 page cache 连续读的预测机制)。须要消费超过几个小时之前的数据的场景下,消费者一样平常都是做数据剖析或者离线任务,此时下贱的目标都是吞吐量优先而非延迟。对付 RocketMQ 来说有两个比较好的办理方案,第一是同 redirect 的办法将读取要求转发给备进行分摊读压力,或者是从转储后的二级介质读取。在数据转储后,RocketMQ 本身的数据存储格式会发生变革,详见后文。
索引数据管理在数据写入 CommitLog 后,在做事端当 MessageStore 向 CommitLog 写入一些后,有一个后真个 ReputMessageService 做事 (dispatch 线程) 会异步的构建多种索引,知足不同形式的读取诉求。
3.1 行列步队维度的有序索引 ConsumeQueue
在 RocketMQ 的模型下,本身存在的逻辑行列步队称为 MessageQueue,而对应的物理索引文件称为 ConsumeQueue。从某种意义上说 MessageQueue = 多个连续 ConsumeQueue 索引 + CommitLog 文件。ConsumeQueue 相对与 CommitLog 来说是一个更加轻量。dispatch 线程会源源不断的将从 CommitLog 取出,再拿出在 CommitLog 中的物理偏移量 (相对付文件存储的 Index),长度以及Tag Hash 作为单条的索引,分发到对应的消费行列步队。偏移 + 长度构成了对 CommitLog 的引用 (Ref)。这种 Ref 机制对付单挑只有 20B,显著降落了索引存储开销。ConsumeQueue 实际写入的实现与 CommitLog 不同,CommitLog 有很多存储策略可以选择且稠浊存储,一个 ConsumeQueue 只会保存一个 Topic 的一个分区的索引,持久化默认利用 FileChannel,实际上这里利用 mmap 的话对小数据量的要求更加友好,不用陷入中断。
客户真个 pull 要求到做事端实行了如下流程来查询:
根据 Tag 的 Hash 值查询 ConsumeQueue 文件(由 physicOffset + size + Tag HashCode 组成)根据 ConsumeQueue 拿到 physicOffset + size根据 physicOffset 查询 CommitLog 文件(上文的 MappedFileQueue ) 得到RocketMQ中默认指定每个消费行列步队的文件存储 30 万条索引,而一个索引占用 20 个字节,这样每个文件的大小是 300 1000 20 / 1024 / 1024 ≈ 5.72M。为什么消费行列步队文件存储的个数要设置成 30 万呢?这个履历值适宜量比较大的场景,事实上这个值对付大部分场景来说是偏大的,有效数据的真实占用率很低,导致ConsumeQueue 空载率高。先来看看如果过大或者过小会带来什么问题。由于总是有失落效期的,例如 3 天失落效,如果消费行列步队的文件设置过大的话,有可能一个文件中包含了过去一个月的索引,但这个时候原始的数据已经滚动没了,白白摧残浪费蹂躏了很多空间。但也不宜太小,导致 ConsumeQueue 也有大量小文件,降落读写性能。下面给出一个非严谨的空载率推导过程:
假设此时单机的 Topic = 5000,单节点单个 Topic 的行列步队数一样平常是 8,分区数量 = 4万。以 1T 数据为例,每条大小是 4KB,索引数量 = 数量 = 1024 1024 1024 / 4 = 2.68 亿。最少须要的 ConsumeQueue = 索引数量 / 30万 = 895 个,实际利用率 (有效数据量) 约即是 2.4%。随着 ConsumeQueue Offset 的原子自增滚动, cq 头部是无效数据导致占用的磁盘空间会变大。根据公有云线上的情形来看,非 0 数据约占 5%,实际有效数据只占 1%。对付 ConsumeQueue 这样的索引文件,我们可以利用 RocksDB 或者傲腾这样的持久化内存来存储,或者对 ConsumeQueue 单独实现一个用户态文件系统,几个方案都可以减少整体索引文件大小,提高访问性能。这一点在后文关于存储机制的优化中,我们再详聊。
由于 CommitLog - ConsumerQueue - Offset 的关系从写入的那一刻开始就确定了,在 Topic 跨副本组迁移,副本组要下线等须要切流的场景下,如果须要可读,须要采取复制数据的方案来实现 Topic 跨副本组迁移,只能采取级别的拷贝,而不能大略的把一个分区从副本组 A 移动到副本组 B。有一些产品在面对这个场景时,采取了数据按分区复制的方案,这种方案可能会急速产生大量的数据传输(分区 rebalance),而 RocketMQ 的切流一样平常可以做到秒级生效。
3.2 维度的随机索引 IndexFile
RocketMQ 作为业务的首选,上文中 ReputMessageService 线程除了构建消费行列步队的索引外,还同时为每条根据 id, key 构建了索引到 IndexFile。这是方便快速快速定位目标而产生的,当然这个构建随机索引的能力是可以降级的,IndexFile文件构造如下:
IndexFile 也是定长的,从单个文件的数据构造来说,这是实现了一种大略原生的哈希拉链机制。当一条新的索引进来时,首先利用 hash 算法命中黄色部分 500w 个 slot 中的一个,如果存在冲突就利用拉链办理,将最新索引数据的 next 指向上一条索引位置。同时将的索引数据 append 至文件尾部(绿色部分),这样便形成了一条当前 slot 按照韶光存入的倒序的链表。这里实在也是一种 LSM compaction 在模型下的改进,降落了写放大。
存储机制的演进方向
RocketMQ 的存储设计因此大略可靠行列步队模型作为核心来抽象的,也因此产生了一些毛病和对应的优化方案。
4.1 KV 模型与 Queue 模型结合
RocketMQ 实现了单条业务的退避重试,在生产实践中,我们创造部分用户在客户端消费限流时直接将返回失落败,在重试量比较大的时候,由于原有实现下重试行列步队数有限,导致重试无法很好的负载均衡到所有客户端。同时,来回的在做事端和客户端之间传输,使得两侧的开销都增加了,用户侧精确的做法该当是消费限流时,让消费的线程等待一下子。从存储做事的角度上来说,这实在是一种行列步队模型的不敷,让一条行列步队只能被一个消费者持有。RocketMQ 提出了 pop 消费这种全新的观点,让单条行列步队的能够被多个客户端消费到,这涉及到做事端对单条的加解锁,KV 模型就非常契合这个场景。从长远来看,像定时事务可以有一些基于 KV 的更原生的实现,这也是 RocketMQ 未来努力的方向之一。
4.2 的压缩与归档存储
压缩便是用韶光去换空间的经典 trade-off,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。目前 RocketMQ 客户端从延迟考虑仅单条大于 4K 的进行单条压缩存储的。做事端对付收到的没有急速进行压缩存储有多个缘故原由,例如为了担保数据能够及时的写入磁盘,稀疏的时候攒批效果比较差等,以是 Body 没有压缩存储。而对付大部分的业务 Topic 来说,实在 Body 一样平常都有很大程度上是相似的,可以压缩到原来的几分之一到几十分之一。
存储一样平常有高速(高频)介质与低速介质,热数据存放在高频介质上(如傲腾,ESSD,SSD),冷数据存放在低频介质上(NAS,OSS),以此来知足低本钱保存更久的数据。从高频介质转到更低频的 NAS 或者 OSS 时,不可避免的产生了一次数据拷贝。我们可以在这个过程中异步的对数据进行规整(闲时资源富余)。那么我们为什么要做规整呢,直接零拷贝复制不喷鼻香吗?答案便是低频介质虽然便宜大碗,但常日 iops 和吞吐量更低。对付 RocketMQ 来说须要规整的数据便是索引和 CommitLog 中的,也便是说在高频介质与低频介质上的存储格式可以是完备不同的。当热降级到二级存储的时候,数据密集且异步,这里便是一个非常得当的机会进行压缩和规整。业界也有一些基于 FPGA 来加速存储压缩的案例,将来我们也会持续的做这方面的考试测验。
4.3 存储层资源共享与争抢
4.3.1 磁盘 IO 的抢占
没错,这里想谈谈的实在是硬盘的调度算法。在一个考虑性价比的场景下,由于 RocketMQ 的存储机制,我们可以把索引文件存储在 SSD,本身放在 HDD 里,由于热总是在 PageCache 中的,以是在 IO 调度上优先知足写而饿去世读。对付没有堆积的消费者来说,消费到的数据是从 page cache 拷贝到 socket 再传输给用户,实时性已经很高了。而对付消费冷数据(几个小时,几天以前的数据)用户的诉求一样平常是尽快获取到即可,此时做事端可以选择尽快知足用户的 Pull 要求,由于大量的随机 IO,这样磁盘会产生严重的 rt 抖动。仔细考虑,这里其实用户想要的是尽可能大的吞吐量,假设访问冷数据须要 200 毫秒,假设在做事端把冷读的行为滞后,再加上延迟 500 毫秒再返回给用户数据,并没有显著的差异。而这里的 500 毫秒,做事端内部就可以合并大量的 IO 操作,我们也可以利用 madvice 系统调用去建议内核读取。这里的合并带来的收益很高,可以显著的减少对热数据的写入的影响,大幅度提升性能。
4.3.2 用户态文件系统
还是为理解决随机读效率低的问题,我们可以设计一个用户态文件系统,让 IO 调用全部 kernel-bypass。
紧张有几个方向:
多点挂载。常用的 Ext4 等文件系统不支持多点挂载,让存储能够支持多个实例的对同一份数据的共享访问。调度对付 IO 的合并策略,IO优先级,polling 模式,行列步队深度等。利用文件系统类似 O_DIRECT 的非缓存办法读写数据。RocketMQ 的未来RocketMQ 存储系统经由多年的发展,基本功能特性已经比较完善,通过一系列的创新技能办理了分布式存储系统中的难题,稳定的做事于阿里集团和海量的云上用户。RocketMQ 在云原生时期的演进中碰着了更多的有趣的场景和寻衅,这是一个须要全链路调优的繁芜工程。我们会持续在规模,稳定性,多活容灾等企业级特性,本钱与弹性等方面发力,将 RocketMQ 打造为“,事宜,流”一体化的领悟平台。同时,我们也会将开源行动更加可持续的发展下去,为社会创造代价。(团队:阿里云智能-GTS)
参考文献[1]. 深入理解 Linux 中的page cache https://www.jianshu.com/p/ae741edd682c[2]. PacificA: Replication in Log-Based Distributed Storage Systems. https://www.microsoft.com/en-us/research/wp-content/uploads/2008/02/tr-2008-25.pdf[3]. J. DeBrabant, A. Pavlo, S. Tu, M. Stonebraker, and S. B. Zdonik. Anti-caching: A new approach to database management system architecture. PVLDB, 6(14):1942–1953, 2013.[4]. 《RocketMQ 技能底细》[5]. 同等性协议中的“幽灵复现”. https://zhuanlan.zhihu.com/p/47025699.[6]. Calder B, Wang J, Ogus A, et al. Windows Azure Storage: a highly available cloud storage service with strong consistency[C]//Proceedings of the Twenty-Third ACM Symposium on Operating Systems Principles. ACM, 2011: 143-157.[7]. Chen Z, Cong G, Aref W G. STAR: A distributed stream warehouse system for spatial data[C] 2020: 2761-2764.[8]. design data-intensive application 《构建数据密集型运用》[9]https://spcl.inf.ethz.ch/Research/Parallel_Programming/KafkaDirect/KafkaDirect.pdf[10]https://github.com/apache/rocketmq/blob/5.0.0-alpha/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_设计.md
作者 | 斜阳
原文链接:http://click.aliyun.com/m/1000347782/
本文为阿里云原创内容,未经许可不得转载。