ClickHouse内核分析系列文章,本文将为大家深度解读Zookeeper在ClickHouse集群中的作用,目前和Zookeeper密切相关的功能包括分布式DDL执行和ReplicatedMergeTree表引擎。最近碰到了很多同学询问和Zookeeper相关的问题,希望通过本文大家可以深刻理解ClickHouse用Zookeeper到底解决哪些问题。正文将会为大家依次介绍分布式DDL执行和ReplicatedMergeTree表引擎依赖的实现细节,建议读者先补充系列文章中关于MergeTree表引擎的前两篇文章,这样会比较容易理解ReplicatedMergeTree表引擎。ReplicatedMergeTree表引擎中的主备同步完全依赖Zookeeper,并且逻辑十分复杂,本文只能为大家呈现一个大体的逻辑链路。
Zookeeper作为一个分布式一致性存储服务,提供了丰富的读写接口和watch机制,分布式应用基于Zookeeper可以解决很多常见问题,例如心跳管理、主备切换、分布式锁等。建议对Zookeeper完全没有了解的同学先补充一些Zookeeper的基本概念再来读本文。
ClickHouse中依赖Zookeeper解决的问题可以分为两大类:分布式DDL执行、ReplicatedMergeTree表主备节点之间的状态同步。
分布式DDL执行:ClickHouse中DDL执行默认不是分布式化的,用户需要在DDL语句中加上on Cluster XXX的申明才能触发这个功能。和其他完全分布式化的数据库不同,ClickHouse对库、表的管理都是在存储节点级别独立的,集群中各节点之间的库、表元数据信息没有一致性约束。这是由ClickHouse的架构特色决定的:1)彻底Share Nothing,各节点之间完全没有相互依赖;2)节点完全对等,集群中的节点角色统一,ClickHouse没有传统MPP数据库中的前端节点、Worker节点、元数据节点等概念。ClickHouse的这种架构特色决定它可以敏捷化、小规模部署,集群可以任意进行分裂、合并,当然前提要求是感知数据在集群节点上的分布。在ClickHouse的架构形态下,用户可以直接连接任意一个节点进行请求,当用户发送DDL命令时,默认只会在当前连接的节点执行命令。现实中如果用户有一个100台机器的集群,为了创建一个分布式存储的表难道用户需要依次连接每台机器发送DDL命令吗?这会让用户抓狂的,并且存在多个DDL之间的冲突问题无法解决:用户A和用户B同时创建同名表但是表字段又不一致,这肯定会让系统陷入一个诡异的不一致状态。这个就是分布式DDL执行要解决的问题了,ClickHouse集群的每个节点都会把收到的分布式执行DDL请求放入到一个公共的Zookeeper任务队列中,然后每个节点的后台线程会依次任务队列里的DDL,保证了所有分布式DDL的串行执行顺序。
主备节点状态同步:ClickHouse集群化部署中有三个逻辑概念需要先展开介绍一下Cluster、Shard和Replicate,这三者都是ClickHouse在集群节点资源规划上的概念。一个集群可以包括若干个Cluster,一个Cluster可以包括若干个Shard,一个Shard又可以包含若干个Replicate,一个Replicate就是一个特定的节点实例,用户可以通过ClickHouse启动的config.xml来配置这套节点规划逻辑。基于这套逻辑,用户可以把一个集群规划成若干个Cluster,每个Cluster可自定义Shard数量,每个Shard又可以自定义副本数量。这三个概念只作用于资源规划上,单个存储节点内部不同Cluster之间的表都是相互可见的。在数据分析在线化的大趋势下,用户的分析场景对RT和QPS有越来越高的要求。降低RT的一个核心能力是自定义表的Shard数量(Scale Out),传统的MPP数据也都有这个能力。而提升QPS的一个核心能力是自定义表的Replicate数量,传统的MPP数据库都没有表级别的自定义副本数能力,只能做全库的副本数配置。ClickHouse能做到表的Replicate数量自定义技术核心是它把主备同步逻辑放到了具体的表引擎中实现,而不是在节点级别做数据复制。当前只有ReplicatedMergeTree表引擎可以自动做主备状态同步,其他表引擎没有状态同步机制。如果用户需要在多副本Cluster下创建其他表引擎,则需要在写入链路上配置多写逻辑。ReplicatedMergeTree表引擎的同步包括写入同步、异步Merge同步、异步Mutation同步等,它所有的同步逻辑都是强依赖Zookeeper。
在介绍具体的分布式DDL执行链路之前,先为大家梳理一下到底哪些操作是可以走分布式DDL执行链路的,大家也可以自己在源码中查看一下ASTQueryWithOnCluster的继承类有哪些:
ClickHouse内核对每种SQL操作都有对应的IInterpreter实现类,其中的execute方法负责具体的操作逻辑。而以上列举的ASTQuery对应的IInterpreter实现类中的execute方法都加入了分布式DDL执行判断逻辑,把所有分布式DDL执行链路统一都DDLWorker::executeDDLQueryOnCluster方法中。executeDDLQueryOnCluster的过程大致可以分为三个步骤:检查DDLQuery的合法性,把DDLQuery写入到Zookeeper任务队列中,等待Zookeeper任务队列的反馈把结果返回给用户。
检查Query合法性这块有一点值得注意:用户在当前session的database空间下执行一个分布式DDL命令,真实执行DDL操作的节点会在什么database下执行这个DDL呢?这里的逻辑是:1)优先使用DDL Query中指明的database,2)当DDL Query中没有指明database时,优先使用config.xml中的Cluster配置,每个Shard配置可以申明自己的default database,3)若前两者都没有,则使用当前session的database。
DDL Query的分发过程依赖Zookeeper,每一条需要分发的DDL Query转换成一个如下的DDL LogEntry,然后把LogEntry序列化成字符串保存到Zookeeper的任务队列中。LogEntry中包含了SQL信息,分布式执行目标Cluster对应的所有节点地址信息,LogEntry的生成者信息。Zookeeper的任务队列位置是在config.xml配置中统一配置的(用户可以让多个ClickHouse集群共用一套Zookeeper,默认路径为/clickhouse/task_queue/ddl)。ClickHouse都是利用Zookeeper序列自增节点(Sequence Znodes)的特性实现来任务队列,把每个DDL LogEntry保存为任务队列目录下的一个Persistent Sequential Znode,相当于对每个DDL Query赋予了一个集群自增的数字ID,在每个DDL LogEntry对应的Znode下面,还需要创建两个status节点:active Znode用来管理当前有多少节点正在执行这个DDL,finished Znode用来管理当前有多少节点以及完成这个DDL并收集返回的状态信息(包括Exception)。
struct DDLLogEntry
{
String query;
std::vector<HostID> hosts;
String initiator; // optional
static constexpr int CURRENT_VERSION = 1;
...
}
分布式DDL的执行链路如下图所示:
1)节点收到用户的分布式DDL请求;
2)节点校验分布式DDL请求合法性,在Zookeeper的任务队列中创建Znode并上传DDL LogEntry(示例中为query-0000000115),同时在LogEntry的Znode下创建active和finish两个状态同步的Znode;
3)Cluster中的节点后台线程消费Zookeeper中的LogEntry队列执行处理逻辑,处理过程中把自己注册到acitve Znode下,并把处理结果写回到finish Znode下;
4)用户的原始请求节点,不断轮询LogEntry Znode下的active和finish状态Znode,当目标节点全部执行完成任务或者触发超时逻辑时,用户就会获得结果反馈;
这个分发逻辑中有个值得注意的点:分布式DDL执行链路中有超时逻辑,如果触发超时用户将无法从客户端返回中确定最终执行结果,需要自己去Zookeeper上check节点返回结果(也可以通过system.zookeeper系统表查看)。每个节点只有一个后台线程在消费执行DDL任务,碰到某个DDL任务(典型的是optimize任务)执行时间很长时,会导致DDL任务队列积压从而产生大面积的超时反馈。
节点的后台线程在处理一个DDL LogEntry Task时,首先会检查自己是否在DDL LogEntry的目标hosts中,这样可以区分出不同Cluster上的DDL任务,在具体执行DDL之前把自己注册到active Znode下,执行完成DDL之后会把返回结果包括异常信息写回到finish Znode下。
具体的DDL任务执行逻辑还是复用单节点上的执行逻辑,节点之间在处理DDL任务时互不感知。但是在ReplicatedMergeTree表引擎上有一些差异,ReplicatedMergeTree表引擎上的Alter、Optimize、Truncate命令都只在主副本节点上执行,备副本节点拿到这类DDL任务时会直接丢弃掉,主副本节点在执行的过程中也会使用Zookeeper分布式锁锁住这个任务再执行。因为ReplicatedMergeTree表引擎上的数据修改链路有自己内部的一套机制保证主备相互同步,这样避免了破坏主备之间的同步逻辑。下一章会详细讲ReplicatedMergeTree表主备之间的同步问题。
每个节点后台除了一个DDL任务消费线程外,还有一个过期DDL任务清理线程。清理线程会根据DDL任务队列的容量以及过期时间来清理以及全部完成的任务,清理过程中依旧会使用基于Zookeeper实现的分布式锁进行保护。
用户请求节点会不断轮训DDL LogEntry Znode下的active Znode和finish Znode,拉取执行状态,随着轮训次数的增加线程不断增加sleep时间,最后等到超时或者全部节点完成任务才把统计信息返回给客户端。到这里整个分布式DDL执行链路就已经全部完成啦,可以看出Zookeeper在分布式DDL执行过程中主要充当DDL Task的分发、串行化执行、结果收集的一致性介质。分布式DDL功能对Zookeeper不会造成很大的性能压力,多个ClickHouse集群可以共享同一套Zookeeper来完成分布式DDL任务。最后ClickHouse虽然用Zookeeper解决了分布式DDL串行化执行的问题,但是目前还没有实现两阶段提交的逻辑,用户需要注意分布式DDL如果失败可能会导致节点间的状态不一致。
上一章介绍的分布式DDL功能对Zookeeper的依赖情况还是比较轻量级的,接下来介绍的ReplicatedMergeTree表引擎对Zookeeper的依赖可以说是所有表操作全方面的依赖,真实集群中大量的ReplicatedMergeTree表会对Zookeeper造成非常大的请求压力,需要用户关注Zookeeper的运维。
ReplicatedMergeTree表引擎实现的主备同步和传统主备同步有很大的差异:1)它不是一个(抢主,主节点执写入更新,备节点同步follow)的模型,ClickHouse的主节点和备节点都可以写,同步是双向的;2)它不是物理同步,ClickHouse没有基于物理文件的WAL;3)它的逻辑同步日志粒度是MergeTree的Data Part级别的(没有单条记录的同步日志),包含Data Part的增、删、改。ReplicatedMergeTree表的Data Part Log主要包含以下几类:
enum Type
{
EMPTY, /// Not used.
GET_PART, /// Get the part from another replica.
MERGE_PARTS, /// Merge the parts.
DROP_RANGE, /// Delete the parts in the specified partition in the specified number range.
CLEAR_COLUMN, /// Drop specific column from specified partition.
CLEAR_INDEX, /// Drop specific index from specified partition.
REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones
MUTATE_PART, /// Apply one or several mutations to the part.
};
这些类型的Log中部分只有主节点可以生成("MERGE_PARTS"),部分是主备节点都可以生成的("GET_PART")。"GET_PART"日志是节点数据写入时产生的,且主备节点都可以写,每个节点写入数据后上传一个"GET_PART"日志到Zookeeper通知其他副本节点从自己这里下载数据。这里大家可能会疑惑:既然主备节点都可以写入,那为什么主备节点不能独立进行merge或者mutation?我认为核心原因有两个:
1)降低代码逻辑复杂度,MergeTree表引擎有两类后台异步任务(Merge/Mutation),同时又有所有节点可写的设定,这两个逻辑融合到一起的话复杂度会爆炸,ClickHouse的内核实现中是把写入和异步动作的链路完全解耦开的。主节点负责分发各种异步任务到Zookeeper上的任务队列,Shard下的所有节点观察任务队列进行follow执行。当万一某个其他节点上的数据和主节点不一致无法完成某个异步任务时,还有保底方法是让它直接从主节点去下载完成merge / mutation的Data Part。
2)MergeTree结构的表引擎有众多的变种merge逻辑(ReplacingMergeTree、CollapsingMergeTree等),再加上异步mutation的机制,多副本之间独立merge / mutation的话,副本间的数据视图同步进度就会完全失控(用户可能需要停写很长时间再加上手动Optimize才能达到副本间一致)。
上一篇系列文章中,我介绍过MergeTree表对Data Part的管理方式,要实现基于Data Part Log的同步,首先要确保节点之间的Data Part有统一的命名体系,而决定Data Part命名的核心因素是每一批数据写入时被赋予的blockNumber(数据的写入版本号),ClickHouse的写入链路利用了Zookeeper来生成全局一致的blockNumber序列。其次为了数据一致性保证,ClickHouse把ReplacingMergeTree表引擎中的所有Data Parts都注册到了Zookeeper上(包括它们的列信息和checksum),最终本地数据都要以Zookeeper上的状态为准。
ReplicatedMergeTree的写入链路分为三步:
1)把数据写入到本地的临时Data Part中,
2)从Zookeeper上申请自增的blockNumber序列号,
3)commit临时Data
Part,生成一条"GET_PART"的同步日志上传到Zookeeper任务队列中。和分布式DDL执行任务队列不同,每一个ReplicatedMergeTree表引擎在Zookeeper上都有一个独立的Znode,这个Znode的路径可以在建表示配置。下图展示了示例dm_t_pecust_lab_part表的Znode目录结构,每个Shard和Replicate逻辑都有自己独立的目录空间。
一次常规的ReplicatedMergeTree批量写入首先会把写入的数据按照数据分区进行拆分,然后依次处理拆分后的每个数据Block。把数据Block写入到存储的临时Data
Part后,ClickHouse需要从Zookeeper中获取下一个全局的blockNumber,这部分逻辑主要在StorageReplicatedMergeTree::allocateBlockNumber函数中,核心是调用Zookeeper生成一个Ephemeral
Sequential
Znode来获取全局唯一的序列(这里的"全局"是单个数据分区级别唯一,跨数据分区是可以重合的)。最后是commit这个Data
Part,commit的过程需要完成一系列的"检查动作"最后上传一个"GET_PART"类型的Log到Shard对应的Zookeeper目录下的的log
Znode下,其他副本通过观察Zookeeper会异步来拷贝写入的Data
Part。前序的"检查动作"目前包括,检查本地表的meta(列信息)版本是否已经落后于Zookeeper上的状态,注册写入Data
Part的columns信息和checksums到Data
Part的Znode下。从这里可以看出一次Batch写入的过程和Zookeeper交互的次数不下10次,要是Batch数据跨10个数据分区的话那就是100次。老话重提一下:使用ClickHouse时一定要做Batch写入并且按照数据分区提前聚合。
两阶段提交的设计中,有个普遍问题是往Zookeeper上提交"GET_PART" Log时zk session断开或者超时了怎么办?本地的Data Part是Commit还是Rollback?ClickHouse在这里的解法是Commit Data Part,并抛错给用户重试写入数据,同时把Data Part丢到一个异步检查线程的任务队列中,异步检查线程会等待重连Zookeeper,检查本地的Data Part是否注册在Zookeeper上,如果没有则会移除本地的Data Part。相当于一个异步的数据修复保护手段,在其他两阶段提交链路中碰到相同的问题也都是依赖这个异步检查线程来进行修复。具体的异步检查线程代码在ReplicatedMergeTreePartCheckThread::run函数中,有兴趣的同学可以仔细看一下这块代码。
ReplicatedMergeTree的写入链路有关的还有几个开关值得注意:
use_minimalistic_part_header_in_zookeeper,这是一个降低Zookeeper压力的配置(默认关闭)。开启之后每个新写入的Data Part不再注册自己的columns信息和checksums到Zookeeper上。而是压缩成Hash值写到Data Part的Znode data中。
insert_quorum,这个开关会强迫写入链路检查数据同步的副本数达到要求才能成功返回(默认是0)。开启之后写入节点在Commit Data Part时还会创建一个Shard级别的quorum/status Znode,其他节点同步完数据之后需要更新到quorum/status,写入节点这边通过Watch机制收到通知再返回客户的写入请求。这个开关不建议开启的,因为写入链路的RT肯定会明显上升,同时因为quorum/status Znode是Shard级别创建,不能再多个副本并行写入。
insert_deduplicate,简单实用的数据去重功能(默认开启)。ClickHouse会对每次收到的批量写入数据计算一个Hash Value,然后注册到Zookeeper上。后续如果出现完全重复的一批数据,写入链路上会出现Zookeeper创建重复节点异常,用户就会收到重复写入反馈。当然批量写入的Hash Value保存是有窗口大小限制的,有统一的异步后台线程会清理这些Zookeeper上的过期记录,清理的逻辑代码在ReplicatedMergeTreeCleanupThread::run函数中,有兴趣的同学可以仔细看一下这块代码。
介绍完ReplicatedMergeTree的整个同步写入过程,接下来就是多副本之间的异步同步过程了,ClickHouse为每个ReplicatedMergeTree表引擎实例创建了非常多的异步Task,所有Data Part的生命周期管理由这些异步Task共同完成。因为文章篇幅原因下面只会依次简单介绍每个Task所做的事情以及其中逻辑特别复杂的点,希望读者对ReplicatedMergeTree主备同步的逻辑复杂度有一个简单的了解:
StorageReplicatedMergeTree::queueUpdatingTask
同步Zookeeper中Shard级别下的Data Part Log任务队列数据到自己的Znode任务队列中,同时在自己的Znode下维护更新当前正在处理的log_pointer(当前已经拷贝过的最大log Id)和min_unprocessed_insert_time(近似评估写入的延迟时间)信息,最后会把任务放到节点的RAM队列中。从Shard级别的公共任务队列迁移数据到节点自己的任务队列,核心问题是高频写入时公共任务队列里的任务发布会非常频繁,需要尽快清理公共队列,防止公共任务队列膨胀,因为所有节点都在轮训读取公共队列(Zookeeper的任务队列无法增量读取)。
StorageReplicatedMergeTree::mutationsUpdatingTask
从Zookeeper的Shard级别下的Mutation任务队列同步数据到节点的RAM状态中,这里没有再为每个节点维护自己的内部Znode队列,Mutation是相对低频的操作,公共的任务队列不会有数据积压。另一个问题是Mutation操作如此低频,ClickHouse如何调度Task运行呢?这里核心的机制也是依赖Zookeeper的Watch机制来通知ClickHouse的BackgroundSchedulePool调度起工作Task,包括上一个queueUpdatingTask也是相同机制被调度。
StorageReplicatedMergeTree::queueTask
上面两个Task都是从Zookeeper中同步任务到RAM的任务队列中,而且Task都是单线程调度执行。queueTask则是负责从RAM任务队列中消费执行具体的操作,并且会有多个后台线程被调度起并行执行多个任务。由于queueTask会被并行执行,运行的过程中有一个问题是如何从RAM中的任务队列里找到下一个要执行的任务?如果Task A是merge Data Part 1 和 Data Part 2, 而Data Part 2的下载任务正在另一个线程中执行,这时Task A就不能调度执行。ClickHouse在RAM状态中追踪了所有正在执行的任务即将产生和依赖的Data Part,可以保证有数据依赖关系的任务串行化执行。对于"GET_PART"类型的任务,Task执行逻辑会尝试从远端节点下载数据到本地,同时如果有quorum数量要求的话更新quorum统计信息。这里ClickHouse对节点下载远端数据的并行数做了控制,详见参数replicated_max_parallel_fetches、replicated_max_parallel_fetches_for_[table|host]。
对于"MERGE_PARTS","MUTATE_PART"的任务,节点首先会尝试在本地进行实际的merge或者mutation动作,但是当本地的Input Data Part存在缺失或者损坏时,ClickHouse可以采用保守策略:尝试从远端下载merge完成的Data Part。当Input Data Part的数据量很大同时这个任务创建时间又很长时(远端大概率已经存在Output Data Part),ClickHouse会直接选择从远端下载的策略来跳过本地merge / mutation加速任务执行。在大规模数据场景下,每次merge、mutation的开销都是非常大的,配置只选择主副本完成merge、mutation任务,而让其他副本直接从远程下载可以大幅减轻集群的负载。
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。