随着 RocketMQ 5.1.0 的正式发布,多级存储作为 RocketMQ 一个新的独立模块到达了 Technical Preview 里程碑:允许用户将消息从本地磁盘卸载到其他更便宜的存储介质,可以用较低的成本延长消息保留时间。本文详细介绍 RocketMQ 多级存储设计与实现。
RocketMQ 多级存储旨在不影响热数据读写的前提下将数据卸载到其他存储介质中,适用于两种场景:
RocketMQ 多级存储对比 Kafka 和 Pulsar 的实现最大的不同是我们使用准实时的方式上传消息,而不是等一个 CommitLog 写满后再上传,主要基于以下几点考虑:
多级存储在设计上希望降低用户心智负担:用户无需变更客户端就能实现无感切换冷热数据读写链路,通过简单的修改服务端配置即可具备多级存储的能力,只需以下两步:
可选项:支持修改 tieredMetadataServiceProvider 切换元数据存储的实现,默认是基于 json 的文件存储
更多使用说明和配置项可以在 GitHub 上查看多级存储的 README[1]
architecture
接入层:TieredMessageStore/TieredDispatcher/TieredMessageFetcher
接入层实现 MessageStore 中的部分读写接口,并为他们增加了异步语意。TieredDispatcher 和 TieredMessageFetcher 分别实现了多级存储的上传/下载逻辑,相比于底层接口这里做了较多的性能优化:包括使用独立的线程池,避免慢 IO 阻塞访问热数据;使用预读缓存优化性能等。
容器层:TieredCommitLog/TieredConsumeQueue/TieredIndexFile/TieredFileQueue
容器层实现了和 DefaultMessageStore 类似的逻辑文件抽象,同样将文件划分为 CommitLog、ConsumeQueue、IndexFile,并且每种逻辑文件类型都通过 FileQueue 持有底层物理文件的引用。有所不同的是多级存储的 CommitLog 改为 queue 维度。
驱动层:TieredFileSegment
驱动层负责维护逻辑文件到物理文件的映射,通过实现 TieredStoreProvider 对接底层文件系统读写接口(Posix、S3、OSS、MinIO 等)。目前提供了 PosixFileSegment 的实现,可以将数据转移到其他硬盘或通过 fuse 挂载的对象存储上。
RocketMQ 多级存储的消息上传是由 dispatch 机制触发的:初始化多级存储时会将 TieredDispatcher 注册为 CommitLog 的 dispacher。这样每当有消息发送到 Broker 会调用 TieredDispatcher 进行消息分发,TieredDispatcher 将该消息写入到 upload buffer 后立即返回成功。整个 dispatch 流程中不会有任何阻塞逻辑,确保不会影响本地 ConsumeQueue 的构建。
TieredDispatcher
TieredDispatcher 写入 upload buffer 的内容仅为消息的引用,不会将消息的 body 读入内存。因为多级储存以 queue 维度构建 CommitLog,此时需要重新生成 commitLog offset 字段。
upload buffer
触发 upload buffer 上传时读取到每条消息的 commitLog offset 字段时采用拼接的方式将新的 offset 嵌入到原消息中。
每个队列都会有两个关键位点控制上传进度:
upload progress
类比消费者,dispatch offset 相当于拉取消息的位点,commit offset 相当于确认消费的位点。commit offset 到 dispatch offset 之间的部分相当于已拉取未消费的消息。
TieredMessageStore 实现了 MessageStore 中的消息读取相关接口,通过请求中的逻辑位点(queue offset)判断是否从多级存储中读取消息,根据配置(tieredStorageLevel)有四种策略:
/**
* Asynchronous get message
* @see #getMessage(String, String, int, long, int, MessageFilter)
getMessage
*
* @param group Consumer group that launches this query.
* @param topic Topic to query.
* @param queueId Queue ID to query.
* @param offset Logical offset to start from.
* @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired
messages.
* @return Matched messages.
*/
CompletableFuture<GetMessageResult> getMessageAsync(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter
messageFilter);
需要从多级存储中读取的消息会交由 TieredMessageFetcher 处理:首先校验参数是否合法,然后按照逻辑位点(queue offset)发起拉取请求。TieredConsumeQueue/TieredCommitLog 将逻辑位点换算为对应文件的物理位点从 TieredFileSegment 读取消息。
// TieredMessageFetcher#getMessageAsync similar with
TieredMessageStore#getMessageAsync
public CompletableFuture<GetMessageResult> getMessageAsync(String
group, String topic, int queueId,
long queueOffset, int maxMsgNums, final MessageFilter
messageFilter)
TieredFileSegment 维护每个储存在文件系统中的物理文件位点,并通过为不同存储介质实现的接口从中读取所需的数据。
/**
* Get data from backend file system
*
* @param position the index from where the file will be read
* @param length the data size will be read
* @return data to be read
*/
CompletableFuture<ByteBuffer> read0(long position, int length);
TieredMessageFetcher 读取消息时会预读一部分消息供下次使用,这些消息暂存在预读缓存中。
protected final Cache<MessageCacheKey /* topic, queue id and queue
offset */,
SelectMappedBufferResultWrapper /* message data */> readAheadCache;
预读缓存的设计参考了 TCP Tahoe 拥塞控制算法,每次预读的消息量类似拥塞窗口采用加法增、乘法减的机制控制:
预读缓存支持在读取消息量较大时分片并发请求,以取得更大带宽和更小的延迟。
某个 topic 消息的预读缓存由消费这个 topic 的所有 group 共享,缓存失效策略为:
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。