ZooKeeper 是个针对大型分布式系统的高可用、高性能且具有一致性的开源协调服务,被广泛的使用。对于开发人员,ZooKeeper 是一个学习和实践分布式组件的不错的选择。本文对 ZooKeeper 的源码进行简析,也会介绍 ZooKeeper 实践经验,希望能帮助到初学 ZooKeeper 的朋友。文章部分内容参考了一些网络文章,已标注在末尾参考文献中。
初衷
在业务中使用了 ZooKeeper 作为消息系统,在开发和运维过程中,也遇到一些问题,萌发了阅读源码窥视实现细节的想法。同时我们运维的 ZooKeeper 集群规模和数据规模非常大,也想把运维的经验分享出来供参考去规避风险点和性能调优。
目标读者
本文是介绍 ZooKeeper 基础知识和源码分析的入门级材料,适合用于初步进入分布式系统的开发人员,以及使用 ZooKeeper 进行生产经营的应用程序运维人员。
本文内容介绍
第 1 章节:主要介绍 ZooKeeper 使命、地位、基础的概念和基本组成模块。
第 2 章节:主要介绍 ZooKeeper 内部运行原理,此部分主要从书籍《ZooKeeper 分布式过程协同技术详解》摘录,对于有 ZooKeeper 基础的可以略过。写第二章节的主要目的,不先陷入解析源码的繁琐的实现上,而是从系统和底层看 ZooKeeper 如何运行,通过从高层次介绍其所使用的协议,以及 ZooKeeper 所采用的在提高性能的同时还具备容错能力的机制,。
第 3 章节:简析 ZooKeeper 的源码实现,主要目的去介绍 ZooKeeper 集群的工作流程,给出看源码的简要指引,能更快上手去深入阅读源码
第 4 章节:主要介绍业务用 zookeeper 做消息系统的实践,在实践中的优化点和踩坑的地方,由于业务场景和规模的差别,关注点和优化点也差别很大,也欢迎在评论区更新使用 ZooKeeper 共性问题。
在大数据和云计算盛行的今天,应用服务由很多个独立的程序组成,这些独立的程序则运行在形形色色,千变万化的一组计算机上,而如何让一个应用中的多个独立的程序协同工作是一件非常困难的事情。而 ZooKeeper 就是一个分布式的,开放源码的分布式应用程序协调服务。它使得应用开发人员可以更多的关注应用本身的逻辑,而不是协同工作上。从系统设计看,ZooKeeper 从文件系统 API 得到启发,提供一组简单的 API,使得开发人员可以实现通用的协作任务,例如选举主节点,管理组内成员的关系,管理元数据等,同时 ZooKeeper 的服务组件运行在一组专用的服务器之上,也保证了高容错性和可扩展性。
本章节主要从 ZooKeeper 的使命、ZooKeeper 基础、工业级案例,来简要介绍 ZooKeeper。
Apache ZooKeeper 的官方介绍,ZooKeeper is a distributed, open-source coordination service for distributed applications. It exposes a simple set of primitives that distributed applications can build upon to implement higher level services for synchronization, configuration maintenance, and groups and naming. It is designed to be easy to program,and uses a data model styled after the familiar directory tree structure of file systems. It runs in Java and has bindings for both Java and C.
从上面的官方介绍看,ZooKeeper 主要的系统功能是在分布式系统中协作多个任务。例如,典型的主-从工作模式中,我们需要主节点和从节点进行协作,在从节点处于空闲状态时会通知主节点可以接受工作,于是主节点就会分配任务给从节点,同时我们只想有一个主节点,而很多进程可能都想成为主节点,这些操作都是要在多个任务中进行协作。另外,协同并不总是采取像主节点选举或者加锁等同步原语的形式,配置元数据也是一个进程通知其他进程需要做什么的一种常用数据,例如,在一个主-从系统中,从节点需要知道任务已经分配给他们,即便在主节点发生崩溃的情况下,这些信息也需要有效。
在 ZooKeeper 之前,一些系统也可以采用分布式锁管理器或者分布式数据库来实现协作,例如,用数据库,redis 实现分布式锁。
那么 ZooKeeper 改变了什么呢?ZooKeeper 的设计更专注于任务协作,它不提供任何锁的接口或者通用的存储数据的接口,也没有强加任何特殊的同步原语,而是提供一个更加敏捷健壮的分布协作方案,例如在主-从模型中,ZooKeeper 没有为应用实现主节点选举,或者进程存活与否的跟踪功能,但是,ZooKeeper 提供了实现这些任务的工具,对于实现什么样的协同任务,有开发人员自己决定。
分布式系统中关键在于进程通信,其有两种选择:直接通过网络进行信息交换,或者读写某些共享存储。对于 ZooKeeper 实现协作和同步原语本质上是使用共享存储模型,即开发的应用是连接到 ZooKeeper 服务器端的客户端,他们连接到 ZooKeeper 服务器端进行相关的操作,以来影响服务器端存储的共享数据,最终应用间实现协作。
ZooKeeper 不适合的场景
整个 ZooKeeper 的服务器集群管理着应用协作的关键数据,ZooKeeper 不适合用作海量的数据存储,对于需要海量的应用数据的情况,可以使用数据库和分布式文件系统,所以在设计应用时,最佳实践是把应用数据和协同数据独立分开。
本小节简要介绍 ZooKeeper 数据结构、监控与通知、服务架构,详细的介绍请查看 ZooKeeper 技术文档和书籍。
ZooKeeper 采用类似于文件系统的层级树状结构进行管理 Znode,并且暴露操作 API 接口。
Znode 的节点类型:在新建 znode 节点,需要指定该节点的类型,不同的类型决定了 znode 节点的行为方式,znode 的类型分为持久节点、时节点、有序节点,组合 4 中类型,持久的,临时的,持久有序的,临时有序的。对于持久节点,只能主动调用 delete 来删除,而临时的 znode,在当创建该节点的客户端崩溃或者关闭了与 ZooKeeper 的连接时,这个节点就会被删除。一般持久类型的 znode 为应用保存数据,即使 znode 的创建者不再属于应用系统时,数据会保存下来而不丢失。临时 Znode 仅当创建者的会话有效时这些信息必须有效保存,会话超时或者主动关闭时,临时 znode 会自动消失。有序 Znode 节点是被分配唯一一个单调递增的整数。
API 接口
create /path data 创建一个名为/path 的 znode 节点,并包含数据 data。
delete /path 删除名为/path 的 znode。
exists /path 检查是否存在名为/path 的节点。
setData /path data
getData /path
getChildren /path
需要注意的是,ZooKeeper 并不允许局部写入或读取 znode 节点的数据。当设置一个 znode 节点的数据或读取时,znode 节点的内容会被整个替换或者全部读取进来,特别是 getChildren,如果是数据量比较大,会获取大量的数据。
ZooKeeper 客户端获得服务器的数据或者变化,不是通过轮询的模式,而是基于通知的机制,客户端向 ZooKeeper 服务器端注册需要接收通知的 znode,通过对 znode 设置监视点来接收通知,需要强调的是监视点是一个单次触发的操作。
ZooKeeper 服务器端运行于两种模式下:独立模式和仲裁模式。独立服务器只有一个单独的服务器,ZooKeeper 状态无法复制。而在仲裁模式下,具有一组 ZooKeeper 服务器,称为 ZooKeeper 集合,它们之间可以进行状态的复制,并同时服务客户端的请求。不过服务器集合并不会让客户端等待每个服务器完成数据保存后再继续,而是在满足仲裁数目的服务器保存或者同步了状态就会返回给客户端。在解决这一分布式数据一致性,ZooKeeper 采用 ZAB(ZooKeeper Atomic Broadcast)的一致性协议,关于 ZAB 协议后面会详细的介绍。
ZooKeeper 客户端在服务器集群中执行任何请求前必须先与服务器建立会话(session),客户端提交给 ZooKeeper 的所有操作均关联在一个会话上。客户端初始化连接到集合中某个服务器或一个独立的服务器,客户端提供TCP 协议与服务器进行连接并通信,但当会话无法与当前连接的服务器继续通信时,会话就可能转移到另外一个服务器,ZooKeeper 客户端透明地转移一个会话到不同的服务器。需要指明的,会话提供了顺序保障,同一个会话中的请求会以 FIFO(先进先出)顺序执行。
Apache HBase
HBase 是一个通常与 Hadoop 一起使用的数据存储仓库。在 HBase 中,ZooKeeper 用于选举一个集群内的主节点,以便跟踪可用的服务器,并保持集群的元数据。
Apache Kafka
Kafka 是一个基于发布-订阅模型的消息系统。其中 ZooKeeper 用于检测崩溃,实现主题的发现,并保持主题的生产和消费状态。
Apache Solr
Solr 是一个企业级的搜索平台,它使用 ZooKeeper 来存储集群的元数据,并协作更新这些元数据。
ZooKeeper 应该是 “The King Of Coordination for Big Data”!
本章节是介绍 ZooKeeper 内部是如何运行的,通过从高层次介绍其所使用的协议,以及 ZooKeeper 所采用的在提供高性能的同时还具有容错能力的机制。总体来说 ZooKeeper 运行于一个集群环境中,选举出某个服务器作为群首(Leader),其他服务器追随群首(Follower)。群首作为中心处理所有对 ZooKeeper 系统变更的请求,它就像一个定序器,建立了所有对 ZooKeeper 状态的更新的顺序,追随者接收群首所发出更新操作请求,并对这些请求进行处理,以此来保障状态更新操作不会发生碰撞。
ZooKeeper 服务器会在本地处理只读请求(exists、getData、getChildren),例如一个服务器接收客户端的 getData 请求,服务器读取该状态信息,并把这些信息返回给客户端。
那些会改变 ZooKeeper 状态的客户端请求(create,delete 和 setData)将会转发到群首,群首执行对应的请求,并形成状态的更新,称为事务(transaction),其中事务要以原子方式执行。同时,一个事务还要具有幂等性,事务的幂等性在我们进行恢复处理时更加简单,后面我们可以看到如何利用幂等性进行数据恢复或者灾备。
在群首产生了一个事务,就会为该事务分配一个标识符,称为会话 id(zxid),通过 Zxid 对事务进行标识,就可以按照群首所指定的顺序在各个服务器中按序执行。服务器之间在进行新的群首选举时也会交换 zxid 信息,这样就可以知道哪个无故障服务器接收了更多的事务,并可以同步他们之间的状态信息。
Zxid 为一个 long 型(64 位)整数,分为两部分:时间戳(epoch)部分和计数器(counter)部分。每一部分为 32 位,在我们讨论 zab 协议时,我们就会发现时间戳(epoch)和计数器(counter)的具体作用,我们通过 zab 协议来广播各个服务器的状态变更信息。
群首为集群中的服务器选择出来的一个服务器,并会一直被集群所认可。设置群首的目的是为了对客户端所发起的 ZooKeeper 状态更新请求进行排序,包括 create,setData 和 delete 操作。群首将每一个请求转换为一个事务,将这些事务发送给追随者,确保集群按照群首确定的顺序接受并处理这些事务。
每个服务器启动后进入 LOOKING 状态,开始选举一个新的群首或者查找已经存在的群首。如果群首已经存在,其他服务器就会通知这个新启动的服务器,告知哪个服务器是群首,于此同时,新服务器会与群首建立连接,以确保自己的状态与群首一致。如果群首中的所有的服务器均处于 LOOKING 状态,这些服务器之间就会进行通信来选举一个群首,通过信息交换对群首选举达成共识的选择。在本次选举过程中胜出的服务器将进入 LEADING 状态,而集群中其他服务器将会进入 FOLLOWING 状态。
具体看,一个服务器进入 LOOKING 状态,就会发送向集群中每个服务器发送一个通知信息,该消息中包括该服务器的投票(vote)信息,投票中包含服务器标识符(sid)和最近执行事务的 zxid 信息。
当一个服务器收到一个投票信息,该服务器将会根据以下规则修改自己的投票信息:
从上面的投票过程可以看出,只有最新的服务器将赢得选举,因为其拥有最近一次的 zxid。如果多个服务器拥有的最新的 zxid 值,其中的 sid 值最大的将会赢得选举。
当一个服务器连接到仲裁数量的服务器发来的投票都一样时,就表示群首选举成功,如果被选举的群首为某个服务器自己,该服务器将会开始行使群首角色,否则就会成为一个追随者并尝试连接被选举的群首服务器。一旦连接成功,追随者和群首之间将会进行状态同步,在同步完成后,追随者才可以进行新的请求。
在接收到一个写请求操作后,追随者会将请求转发给群首,群首将会探索性的执行该请求,并将执行结果以事务的方式对状态更新进行广播。如何确认一个事务是否已经提交,ZooKeeper 由此引入了 zab 协议,即原子广播协议(ZooKeeper Atomic Broadeast protocol)。该协议提交一个事务非常简单,类型于一个两阶段提交。
Zab 保障了以下几个重要的属性
如果群首按顺序广播了事务 T 和事务 T,那么每个服务器在提交 T 事务前保证事务 T 已经完成提交。
如果某个服务器按照事务 T 和事务 T 的顺序提交了事务,所有其他服务器也必然会在提交事务 T 前提交事务 T。
第一个属性保证事务在服务器之间传送顺序的一致,而第二个竖向保证服务器不会跳过任何事务。
观察者与追随者有一些共同的特点,他们提交来自群首的提议,不同于追随者的是,观察者不参与选举过程,他们仅仅学习经由 INFORM 消息提交的提议。
引入观察者的一个主要原因是提高读请求的可扩展性。通过加入多个观察者,我们可以在不牺牲写操作的吞吐率的前提下服务更多的读操作。但是引入观察者也不是完全没有开销,每一个新加入的观察者将对应于每一个已提交事务点引入的一条额外消息。
采用观察者的另外一个原因是进行跨多个数据中心部署。由于数据中心之间的网络链接延时,将服务器分散于多个数据中心将明显地降低系统的速度。引入观察者后,更新请求能够先以高吞吐量和低延迟的方式在一个数据中心内执行,接下来再传播到异地的其他数据中心得到执行。
群首,追随者,观察者根本上都是服务器。在实现服务器主要抽象概念是请求处理器。请求处理器是对处理流水线上不同阶段的抽象,每个服务器实现一个请求处理器的序列。
2.5.1 独立服务器
PrepRequestProcessor 接受客户端的请求并执行这个请求,处理结果则是生成一个事务。不过只有改变 ZooKeeper 状态的操作才会产生事务,对于读操作并不会产生任何事务。
SyncRequestProcessor 负责将事务持久化到磁盘上。实际上就是将事务数据按照顺序追加到事务日志中,并形成快照数据。
最后一个处理器为 FinalRequestProcessor,如果 Request 对象包含事务数据,该处理器就会接受对 ZooKeeper 数据树的修改,否则,该处理器会从数据树中读取数据并返回客户端。
2.5.2 群首服务器
在切换到仲裁模式时,服务器的流水线则有一些变化。
第一个处理器同样是 PrepRequestProcessor,而之后的处理器则为 ProposalRequestProcessor,该处理器会准备一个提议,并将该提议发送给跟随者,并且会把所有请求转发给 CommitRequestProcessor,对于写操作请求,还会把请求转发给 SyncRequestProcessor 处理器。
SyncRequestProcessor 和独立服务器的功能一样,是持久化事务到磁盘上,执行完后会触发 AckRequestProcessor 处理器,它仅仅生成确认消息并返回给自己。
CommitRequestProcessor 会将收到足够多的确认消息的提议进行提交。
2.5.3 追随者和观察者服务器
Follower 服务器是先从 FollowerRequestProcessors 处理器开始,该处理器接收并处理客户端请求,FollowerRequestProcessors 处理器之后转发请求给 CommitRequestProcessor,同时也会转发写请求到群首服务器。CommitRequestProcessor 会直接转发读取请求到 FinalRequestProcessor 处理器,而且对于写请求,在转发前会等待提交事务。而群首接收到一个新的写请求时会生成一个提议,之后转发到追随者服务器,在收到一个提议,追随服务器会发送这个提议到 SyncRequestProcessor,SendRequestProcessor 会向群首发送确认消息。
当群首服务器接收到足够多确认消息来提交这个提议是,群首就会发送提交事务消息给追随者,当收到提交的事务消息时,追随者就通过 CommitRequestProcessor 处理器进行处理。为了保证执行的顺序,CommitRequestProcessor 处理器会在收到一个写请求处理器时暂停后续的请求处理。
对于观察者服务器不需要确认提议消息,因此观察者服务器并不需要发送确认消息给群首服务器,一般情况下,也不用持久化事务到磁盘。对于观察者服务器是否持久化事务到磁盘,以便加速观察者服务器的恢复速度,可以根据具体情况决定。
SyncRequestProcessor 处理器是用于处理提议写入的日志和快照。
日志和磁盘的使用
服务器通过事务日志来持久化事务。在接受一个提议时,一个服务器就会将提议的事务持久化到事务日志中,该事务日志保存在服务器本地磁盘中,而事务将会按照顺序追加其后。写事务日志是写请求操作的关键路径,因此 ZooKeeper 必须有效处理写日志问题。在持久化事务到磁盘时,还有一个重要说明:现代操作系统通常会缓存脏页(Dirty Page),并将他们异步写入磁盘介质。然而,我们需要在继续之前,要确保事务已经被持久化。因此我们需要冲刷(Flush)事务到磁盘介质。
冲刷在这里就是指我们告诉操作系已经把脏页写入到磁盘,并在操作完成后返回。同时为了提高 ZooKeeper 系统的运行速度,也会使用组提交和补白的。其中组提交是指一次磁盘写入时追加多个事务,可以减少磁盘寻址的开销。补白是指在文件中预分配磁盘存储块。
快照
快照是 ZooKeeper 数据树的拷贝副本,每一个服务器会经常以序列化整个数据树的方式来提取快照,并将这个提取的快照保存到文件。服务器在进行快照时不需要进行协作,也不需要暂停处理请求。因此服务器在进行快照时还会继续处理请求,所以当快照完成时,数据树可能又发生了变化,称为快照是模糊的,因为它们不能反映出在任意给定的时间点数据树的准确的状态。
会话(session)是 ZooKeeper 的一个重要的抽象。保证请求有序,临时 znode 节点,监控点都与会话密切相关。因此会话的跟踪机制对 ZooKeeper 来说也是非常重要的。
在独立模式下,单个服务器会跟踪所有的会话,而在仲裁模式下则由群首服务器来跟踪和维护。而追随者服务器仅仅是简单地把客户端连接的会话信息转发到群首服务器。
为了保证会话的存活,服务器需要接收会话的心跳信息。心跳的形式可以是一个新的请求或者显式的 ping 信息。两种情况下,服务器通过更新会话的过期时间来触发会话活跃,在仲裁模式下,群首服务器发送一个 PING 信息给它的追随者们,追随者们返回自从最新一次 PING 消息之后的一个 session 列表。群首服务器每半个 tick 就会发送一个 ping 信息给追随者们。
监视点是由读取操作所设置的一次性触发器,每个监视点有一个特定操作来触发,即通过监视点,客户端可以对指定的 znode 节点注册一个通知请求,在发生时就会收到一个单次的通知。监视点只会存在内存,而不会持久化到硬盘,当客户端与服务端的连接断开时,它的所有的监视点会从内存中清除。因为客户端也会维护一份监视点的数据,在重连之后,监视点数据会再次同步到服务端。
在客户端库中有 2 个主要的类:ZooKeeper 和 ClientCnxn,写客户端应用程序时通过实例化 ZooKeeper 类来建立一个会话。一旦建立起一个会话,ZooKeeper 就会使用一个会话标识符来关联这个会话。这个会话标识符实际上是有服务端所生产的。
ClientCnxn 类管理连接到 server 的 socket 连接。该类维护一个可连接的 ZooKeeper 的服务列表,并当连接断掉的时候无缝地切换到其他服务器,当重连到一个其他的服务器时会使用同一个会话,客户端也会重置所有的监视点到刚连接的服务器上。
对于网络传输和磁盘保存的序列化消息和事务,ZooKeeper 使用了 Hadoop 中的 Jute 来做序列化。
以 3.5.5 版本作为分析。主要从服务端,客户端,以及服务端和客户端结合的部分分析源码。在分析源码时,主要从数据结构,类结构,线程模型,流程等方面看。(注:本章节参考了网上 ZooKeeper 的分析文章,借用了不少文字描述。)
ZooKeeper 服务的启动方式分为三种,即单机模式、伪分布式模式、分布式模式。本章节主要研究分布式模式的启动模型,其主要要经过 Leader 选举,集群数据同步,启动服务器。
分布式模式下的启动过程包括如下阶段,
注:本章节主要是参考网上 blog 文章,对部分内容作了调整与处理。
具体细节如下,
start 方法实现的业务主要包含四个方面:
1.loadDataBase:涉及到的核心类是 ZKDatabase,并借助于 FileTxnSnapLog 工具类将 snap 和 transaction log 反序列化到内存中,最终构建出内存数据结构 DataTree。
2.cnxnFactory.start:之前介绍过 ServerCnxnFactory 作用,ServerCnxnFactory 本身也可以作为一个线程。
3.startLeaderElection():这个主要是初始化一些 Leader 选举工作。
Leader 选举涉及到节点间的网络 IO,QuorumCnxManager 就是负责集群中各节点的网络 IO,QuorumCnxManager 包含一个内部类 Listener,Listener 是一个线程,这里启动 Listener 线程,主要启动选举监听端口并处理连接进来的 Socket;FastLeaderElection 就是封装了具体选举算法的实现。
4.super.start():QuorumPeer 本身也是一个线程,其继承了 Thread 类,这里就是启动 QuorumPeer 线程,就是执行 QuorumPeer.run 方法。
QuorumPeer 线程进入到一个无限循环模式,不停的通过 getPeerState 方法获取当前节点状态,然后执行相应的分支逻辑。大致流程可以简单描述如下:
a.首先系统刚启动时 serverState 默认是 LOOKING,表示需要进行 Leader 选举,这时进入 Leader 选举状态中,会调用 FastLeaderElection.lookForLeader 方法,lookForLeader 方法内部也包含了一个循环逻辑,直到选举出 Leader 才会跳出 lookForLeader 方法,如果选举出的 Leader 就是本节点,则将 serverState=LEADING 赋值,否则设置成 FOLLOWING 或 OBSERVING。
b.然后 QuorumPeer.run 进行下一轮次循环,通过 getPeerState 获取当前 serverState 状态,如果是 LEADING,则表示当前节点当选为 LEADER,则进入 Leader 角色分支流程,执行作为一个 Leader 该干的任务;如果是 FOLLOWING 或 OBSERVING,则进入 Follower 或 Observer 角色,并执行其相应的任务。注意:进入分支路程会一直阻塞在其分支中,直到角色转变才会重新进行下一轮次循环,比如 Follower 监控到无法与 Leader 保持通信了,会将 serverState 赋值为 LOOKING,跳出分支并进行下一轮次循环,这时就会进入 LOOKING 分支中重新进行 Leader 选举。
在服务器启动阶段需要进行数据恢复阶段。
Leader 选举初始化 QuorumPeer.startLeaderElection(),Leader 选举涉及到两个核心类:QuorumCnxManager 和 FastLeaderElection。
Leader 选举期间集群中各节点之间互相进行投票,就会涉及到网络 IO 通信,QuorumCnxManager 就是用来管理维护选举期间网络 IO 通信的工具类。选举算法逻辑被封装在 FastLeaderElection 类。
在 createElectionAlgorithm()算法中,创建一个 QuorumCnxManager 实例,启动 QuorumCnxManager.Listener 线程,构建选举算法 FastLeaderElection,然后相互交互投票信息,进入 Leader 选举过程。
QuorumCnxManager 有一个内部类 Listener,初始化一个 ServerSocket,然后在一个 while 循环中调用 accept 接收客户端(注意:这里的客户端指的是集群中其它服务器)连接。当有客户端连接进来后,会将该客户端 Socket 封装成 RecvWorker 和 SendWorker,它们都是线程,分别负责和该 Socket 所代表的客户端进行读写。其中,RecvWorker 和 SendWorker 是成对出现的,每对负责维护和集群中的一台服务器进行网络 IO 通信。
FastLeaderElection 负责 Leader 选举核心规则算法实现,包含了两个内部类 WorkerSender 和 WorkerReceiver 线程。
FastLeaderElection 中进行选举时广播投票信息时,将投票信息写入到对端服务器大致流程如下:
a、将数据封装成 ToSend 格式放入到 sendqueue;
b、WorkerSender 线程会一直轮询提取 sendqueue 中的数据,当提取到 ToSend 数据后,会获取到集群中所有参与 Leader 选举节点(除 Observer 节点外的节点)的 sid,如果 sid 即为本机节点,则转成 Notification 直接放入到 recvqueue 中,因为本机不再需要走网络 IO;否则放入到 queueSendMap 中,key 是要发送给哪个服务器节点的 sid,ByteBuffer 即为 ToSend 的内容,queueSendMap 维护的着当前节点要发送的网络数据信息,由于发送到同一个 sid 服务器可能存在多条数据,所以 queueSendMap 的 value 是一个 queue 类型;
c、QuorumCnxManager 中的 SendWorkder 线程不停轮询 queueSendMap 中是否存在自己要发送的数据,每个 SendWorkder 线程都会绑定一个 sid 用于标记该 SendWorkder 线程和哪个对端服务器进行通信,因此,queueSendMap.get(sid)即可获取该线程要发送数据的 queue,然后通过 queue.poll()即可提取该线程要发送的数据内容;
d、然后通过调用 SendWorkder 内部维护的 socket 输出流即可将数据写入到对端服务器。
FastLeaderElection 中进行选举时广播投票信息时,从对端服务器读取投票信息的大致流程如下:
a、QuorumCnxManager 中的 RecvWorker 线程会一直从 Socket 的输入流中读取数据,当读取到对端发送过来的数据时,转成 Message 格式并放入到 recvQueue 中;
b、FastLeaderElection.WorkerReceiver 线程会轮询方式从 recvQueue 提取数据并转成 Notification 格式放入到 recvqueue 中;
c、FastLeaderElection 从 recvqueu 提取所有的投票信息进行比较 最终选出一个 Leader。
Leader 选举算法实现
上面已经介绍了 Leader 选举期间网络 IO 的大致流程,下面介绍下具体选举算法如何实现。
QuorumPeer 线程中会有一个 Loop 循环,获取 serverState 状态后进入不同分支,当分支退出后继续下次循环,FastLeaderElection 选举策略调用就是发生在检测到 serverState 状态为 LOOKING 时进入到 LOOKING 分支中调用的。
进入到 LOOKING 分支执行的代码逻辑:
setCurrentVote(makeLEStrategy().lookForLeader());
从上面代码可以看出,Leader 选举策略入口方法为:FastLeaderElection.lookForLeader()方法。当 QuorumPeer.serverState 变成 LOOKING 时,该方法会被调用,表示执行新一轮 Leader 选举。下面来看下 lookForLeader 方法的大致实现逻辑:
1.更新自己期望投票信息,即自己期望选哪个服务器作为 Leader(用 sid 代替期望服务器节点)以及该服务器 zxid、epoch 等信息,第一次投票默认都是投自己当选 Leader,然后调用 sendNotifications 方法广播该投票到集群中所有可以参与投票服务器,广播涉及到网络 IO 流程前面已讲解,这里就不再细说;
其中,updateProposal()方法有三个参数:a.期望投票给哪个服务器(sid)、b.该服务器的 zxid、c.该服务器的 epoch,在后面会看到这三个参数是选举 Leader 时的核心指标,后面再介绍。
首先对之前提到的选举轮次 electionEpoch 进行判断,这里分为三种情况:
a.只有对方发过来的投票的 electionEpoch 和当前节点相等表示是同一轮投票,即投票有效,然后调用 totalOrderPredicate()对投票进行 PK,返回 true 代表对端胜出,则表示第一次投票是错误的(第一次都是投给自己),更新自己投票期望对端为 Leader,然后调用 sendNotifications()将自己最新的投票广播出去。返回 false 则代表自己胜出,第一次投票没有问题,就不用管
b.如果对端发过来的 electionEpoch 大于自己,则表明重置自己的 electionEpoch,然后清空之前获取到的所有投票 recvset,因为之前获取的投票轮次落后于当前则代表之前的投票已经无效了,然后调用 totalOrderPredicate()将当前期望的投票和对端投票进行 PK,用胜出者更新当前期望投票,然后调用 sendNotifications()将自己期望头破广播出去。注意:这里不管哪一方胜出,都需要广播出去,而不是步骤 a 中己方胜出不需要广播,这是因为由于 electionEpoch 落后导致之前发出的所有投票都是无效的,所以这里需要重新发送
c.如果对端发过来的 electionEpoch 小于自己,则表示对方投票无效,直接忽略不进行处理
totalOrderPredicate()实现了对投票进行 PK 规则:
下面简单说下这个 PK 逻辑原理(胜出一方代表更有希望成为 Leader):
1、首先比较 epoch,哪个 epoch 哪个胜出,前面介绍过 epoch 代表了 Leader 的轮次,是一个递增的,epoch 越大就意味着数据越新,Leader 数据越新则可以减少后续数据同步的效率,当然应该优先选为 Leader;
2、然后才是比较 zxid,由于 zxid=epoch+counter,第一步已经把 epoch 比较过了,其实这步骤只是相当于比较 counter 大小,counter 越大则代表数据越新,优先选为 Leader。注:其实第 1 和第 2 可以合并到一起,直接比较 zxid 即可,因为 zxid=epoch+counter,第 1 比较显的有些多余
3、如果前两个指标都没法比较出来,只能通过 sid 来确定,zxid 相等说明两个服务器的数据是一致的,所以选哪个当 Leader 其实没有区别,这里就选择 sid 大的当 Leader .
下面来看下 LOOKING 分支的最后一部分逻辑:
Leader 选举的流程,ZooKeeper 集群在 Leader 选举完成后,集群中的各个节点就确定了自己的角色信息:Leader、Follower 或 Observer。
如上述代码所述,节点确定了自己的角色后,就会进入自己的角色分支:对于 Leader 而言创建 Leader 实例并调用其 lead()函数,对于 Follower 而言创建 Follower 实例并调用其 followLeader()函数,对于 Observer 而言创建 Observer 实例并调用其 observeLeader()函数。在这三个函数中,服务器会进行相关的初始化并完成最终的启动。
对于 Follower 和 Observer 而言,主要的初始化工作是要建立与 Leader 的连接并同步 epoch 信息,最后完成与 Leader 的数据同步。而 Leader 会启动 LearnerCnxAcceptor 线程,该线程会接受来自 Follower 和 Observer(统称为 Learner)的连接请求并为每个连接创建一个 LearnerHandler 线程,该线程会负责包括数据同步在内的与 learner 的一切通信。
Learn(Follower 或 Observer)节点会主动向 Leader 发起连接,ZooKeeper 就会进入集群同步阶段,集群同步主要完成集群中各节点状态信息和数据信息的一致。选出新的 Leader 后的流程大致分为:计算 epoch、统一 epoch、同步数据、广播模式等四个阶段。其中其前三个阶段:计算 epoch、统一 epoch、同步数据就是这一节主要介绍的集群同步阶段的主要内容,这三个阶段主要完成新 Leader 与集群中的节点完成同步工作,处于这个阶段的 zk 集群还没有真正做好对外提供服务的能力,可以看着是新 leader 上任后进行的内部沟通、前期准备工作等,只有等这三个阶段全部完成,新 leader 才会真正的成为 leader,这时 zk 集群会恢复正常可运行状态并对外提供服务。
被选举为 Leader 角色的节点,会创建一个 Leader 实例,然后执行 Leader.lead()进入到 Leader 角色的任务分支中,其流程大致如下所示:
Leader 分支大致可以分为 5 个阶段:启动 LearnerCnxAcceptor 线程、计算 newEpoch、广播 newEpoch、数据同步和集群状态监测。
Leader.lead()方法控制着 Leader 角色节点的主体流程,其实现较为简单,大致模式都是通过阻塞方法阻塞当前线程,直到该阶段完成 Leader 线程才会被唤醒继续执行下一个阶段;而每个阶段实现的具体细节及大量的网络 IO 操作等都在 LearnerHandler 中实现。比如计算 newEpoch,Leader 中只会判断 newEpoch 计算完成没,没有计算完成就会进入阻塞状态挂起当前 Leader 线程,直到集群中一半以上的节点同步了 epoch 信息后 newEpoch 正式产生才会唤醒 Leader 线程继续向下执行;而计算 newEpoch 会涉及到 Leader 去收集集群中大部分 Learner 服务器的 epoch 信息,会涉及到大量的网络 IO 通信等内容,这些细节部分都在 LearnerHandler 中实现。
涉及到网络 IO 就会存在 Server 和
Client,这里的 Server 就是 Leader,Client 就是 Learner(Follower 和 Observer 统称
Learner),对于 Server 端,主要关注 Leader 和 LearnerHandler 这两个类,而对于 Client
端,根据角色分类主要关注 Follower 或 Observer 这两个类。
ZooKeeper 中主要存在三个端口:
1、客户端请求端口:对应于配置中的 clientPort,默认是 2181,就是客户端连接 ZK 对其进行增删改操作的端口;
2、集群选举端口:之前分析过的集群中 Leader 选举涉及到网络 IO 使用的端口,对应于配置中“server.0=10.80.8.3:2888:2999”这里的 2999 就是集群选举端口;
3、集群同步端口:Leader 选举出后就会涉及到 Leader 和 Learner 之间的数据同步问题,集群同步端口的作用就是做这个使用的,对应于配置中”server.0=10.80.8.3:2888:2999“这里的 2888;
启动 LearnerCnxAcceptor 线程
Leader 首先会启动一个 LearnerCnxAcceptor 线程,这个线程做的工作就非常简单了,就是不停的循环 accept 接收 Learner 端的网络请求(这里的监听端口就是上面说的同步监听端口,而不是选举端口),Leader 选举结束后被分配为 Follower 或 Observer 角色的节点会主动向 Leader 发起连接,Leader 端接收到一个网络连接就会封装成一个 LearnerHandler 线程。
Leader 类可以看成一个总管,和每个 Learner 服务器的交互任务都会被分派给 LearnerHandler 这个助手完成,当 Leader 检测到一个任务被一半以上的 LearnerHandler 处理完成,即认为该阶段结束,进入下一个阶段。
计算 epoch
epoch 在 ZooKeeper 中是一个很重要的概念,前面也介绍过了:epoch 就相当于 Leader 的身份编号,就如同身份证编号一样,每次选举产生一个新 Leader 时,都会为该 Leader 重新计算出一个新 epoch。epoch 被设计成一个递增值,比如上一个 Leader 的 epoch 是 1,假如重新选举新的 Leader 就会被分配 epoch=1。
epoch 作用:可以防止旧 Leader 活过来后继续广播之前旧提议造成状态不一致问题,只有当前 Leader 的提议才会被 Follower 处理。ZooKeeper 集群所有的事务请求操作都要提交由 Leader 服务器完成,Leader 服务器将事务请求转成一个提议(Proposal)并分配一个事务 ID(zxid)后广播给 Learner,zxid 就是由 epoch 和 counter(递增)组成,当存在旧 leader 向 follower 发送命令的时候,follower 发现 zxid 所在的 epoch 比当前的小,则直接拒绝,防止出现不一致性。
统一 epoch
newEpoch 计算完成后,该值只有 Leader 知道,现在需要将 newEpoch 广播到集群中所有的服务器节点上,让他们都更新下新 Leader 的 epoch 信息,这样他们在处理请求时会根据 epoch 判断该请求是不是当前新 Leader 发出的,可以防止旧 Leader 活过来后继续广播之前旧提议造成状态不一致问题,只有当前 Leader 的提议才会被 Follower 处理。
总结:广播 newEpoch 流程也比较简单,就是将之前计算出来的 newEpoch 封装到 LEADERINFO 数据包中,然后广播到集群中的所有节点,同时会收到 ACKEPOCH 回复数据包,当集群中一半以上的节点进行了回复则可以认为 newEpoch 广播完成,则进入下一阶段。同样,为避免线程一直阻塞,休眠线程依然会被添加超时时间,超时后仍未完成则抛出 InterruptedException 异常重新进入 Leader 选举状态。
数据同步
之前分析过 Leader 的选举策略:lastZxid 越大越会被优先选为 Leader。lastZxid 是节点上最大的事务 ID,由于 zxid 是递增的,lastZxid 越大,则表示该节点处理的数据越新,即数据越完整。所以,被选为 Leader 的节点数据完整性越高,为了数据一致性,这时就需要其它节点和 Leader 进行数据同步保持数据一致性。
数据同步四种情况:
DIFF,learner 比 leader 少一些数据;
TRUNC,learner 数据比 leader 多;
DIFF+TRUNC,learner 对 leader 多数据又少数据;
SNAP,learner 比 leader 少很多数据。
群首,追随者,观察者根本上都是服务器,在实现服务器主要抽象概念是请求处理器。请求处理器是对处理流水线上不同阶段的抽象,每个服务器在初始化时实现一个请求处理器的序列。对于请求处理器,ZooKeeper 代码里有一个叫 RequestProcessor 的接口,这个接口的主要方法是processRequest,它接受一个 Request 参数,在一个请求处理器的流水线中,对于相邻处理器的请求的处理是通过队列实现解耦合。当一个处理器有一条请求需要下一个处理器进行处理时,它将这条请求加入队列中。然后,它将处于等待状态直到下一个处理器处理完此消息。本节主要看看各个服务器的请求处理器序列初始化和对队列的使用与处理,处理器的细节可以参考源码。
独立服务器请求链
独立服务器是从 ZooKeeperServerMain.java 开始,
在 PrepRequestProcessor 中,消费请求队列 submittedRequests,数据结构如下
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
PrepRequestProcessor 接受客户端的请求并执行这个请求,处理结果则是生成一个事务。不过只有改变 ZooKeeper 状态的操作才会产生事务,对于读操作并不会产生任何事务。
SyncRequestProcessor.java,SyncRequestProcessor 负责将事务持久化到磁盘上。实际上就是将事务数据按照顺序追加到事务日志中,并形成快照数据。
FinalRequestProcessor.java,FinalRequestProcessor,如果 Request 对象包含事务数据,该处理器就会接受对 ZooKeeper 数据树的修改,否则,该处理器会从数据树中读取数据并返回客户端。
群首服务器(Leader)
请求链
Follower
Observer
参考资料:https://www.jianshu.com/p/45f8a966fb47
从整体看,客户端启动的入口时 ZooKeeperMain,在 ZooKeeperMain 的 run()中,创建出控制台输入对象(jline.console.ConsoleReader),然后它进入 while 循环,等待用户的输入。同时也调用 connectToZK 连接服务器并建立会话(session),在 connect 时创建 ZooKeeper 对象,在 ZooKeeper 的构造函数中会创建客户端使用的 NIO socket,并启动两个工作线程 sendThread 和 eventThread,两个线程被初始化为守护线程。
sendThread 的 run()是一个无限循环,除非运到了 close 的条件,否则他就会一直循环下去,比如向服务端发送心跳,或者向服务端发送我们在控制台输入的数据以及接受服务端发送过来的响应。
eventThread 线程负责队列事件和处理 watch。
客户端也会创建一个 clientCnxn,由 ClientCnxnSocketNIO.java 负责 IO 数据通信。
客户端的场景说明(事务、非事务请求类型)。
客户端源码解析
ZooKeeperMain 初始化
ZooKeeper 的构造函数,cnxn.start()会创建 sendThread 和 eventThread 守护线程。
在 ClientCnxn.java 中,有两个重要的数据结构。
/**
* These are the packets that have been sent and are waiting for a response.
*/
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
/**
* These are the packets that need to be sent.
*/
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
ZooKeeper 类中的对用户的输入参数转换为对 ZK 操作,会调用 cnxn.submitRequest()提交请求,在 ClientCnxn.java 中会把请求封装为 Packet 并写入 outgoingQueue,待 sendThread 线程消费发送给服务端,对于同步接口,调用 cnxn.submitRequest()会阻塞,其中客户端等待是自旋锁。
ClientCnxnSocketNIO.java 主要是调用 dcIO(), 其中读就绪,读取服务端发送过来的数据,写就绪, 往客户端发送用户在控制台输入的命令。
从上面源码看,客户端在 cnxn.submitRequest(),会自旋等待服务端的结果,直到 Packet 的 finished 被设置为 true。ClientCnxnSocketNIO.java 调用 dcIO(),read 逻辑中,会调用 sendThread.readResponse(), 在 sendThread.readResponse()函数中的 finally 中调用 finshPacket()设置 finished 为 true,进而客户端阻塞解除,返回结果。
扩展阅读:
https://www.cnblogs.com/ZhuChangwu/p/11587615.html
会话(Session)
Client 建立会话的流程如下,
客户端源码分析
在 clientCnxn.java 中,run 是一个 while 循环,只要 client 没有被关闭会一直循环,每次循环判断当前 client 是否连接到 server,如果没有则发起连接,发起连接调用了 startConnect。
在 connect 是,传递了如下参数,
服务端源码分析
server 在启动后,会暴露给客户端连接的地址和端口以提供服务。我们先看一下NIOServerCnxnFactory,主要是启动三个线程。
client 发起 socket 连接的时候,server 监听了该端口,接收到 client 的连接请求,然后把建立练级的 SocketChannel 放入队列里面,交给 SelectorThread 处理。
SelectorThread 是一个不断循环的线程,每次循环都会处理刚刚建立的 socket 连接。
session 生成算法
监视(Watch)
本小节主要看看 ZooKeeper 怎么设置监视和监控点的通知。ZooKeeper 可以定义不同类型的通知,如监控 znode 的数据变化,监控 znode 子节点的变化,监控 znode 的创建或者删除。ZooKeeper 的服务端实现了监视点管理器(watch manager)。
一个 WatchManager 类的实例负责管理当前已经注册的监视点列表,并负责触发他们,监视点只会存在内存且为本地服务端的概念,所有类型的服务器都是使用同样的方式处理监控点。
DataTree 类中持有一个监视点管理器来负责子节点监控和数据的监控。
在服务端触发一个监视点,最终会传播到客户端,负责处理传播的为服务端的 cnxn 对象(ServerCnxn 类),此对象表示客户端和服务端的连接并实现了 Watcher 接口。Watch.process 方法序列化了监视点事件为一定的格式,以便于网络传送。ZooKeeper 客户端接收序列化的监视点事件,并将其反序列化为监控点事件的对象,并传递给应用程序。
以同步版本的 GetData 为例
Watcher 接口的定义,如果设置了监视点,我们要实现 process 函数。
客户端 Watcher 的 process()接口
客户端 watcher 实现
在客户端 GetData 时,如果注册 watch 监控点到服务端,在 watch 的 path 的 value 变化时,服务端会通知客户端该变化。
在客户端的 GetData 方法中(ZooKeeper 类的 GetData):
客户端 GetData()
客户端 submitRequest()
服务端如何处理 GetData 请求呢,由于是读请求,我们可以直接看 FinalRequestProcessor 处理器的 public void processRequest(Request request){}方法,看它针对 GetData()方式的请求做出了哪些动作。
zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null)
根据 watcher 的“有无”给服务端添加不同的 Watcher。服务端 GetData()函数,在服务端维护了一份 path+watcher 的 map,如果设置 watcher,服务端会保存该 path 的 watcher。
服务端 GetData()
服务端 GetData()
服务端 addWatch ()
为了测试服务端监视通知客户端,我们在客户端本地输入的命令,
set /path newValue
客户端 SetData()
从 SetData 的源码看,本次的 submitRequest 参数中,WatchRegistration==null,可以推断,服务端在 FinalRequestProcessor 中再处理时取出的 watcher==null,也就不会将 path+watcher 保存进 maptable 中,其他的处理过程和上面 GetData 类似。
服务端在满足触发监控点时,并通过 cnxn 的 process()方法处理(NIOServerCnxn 类)通知到客户端。在服务端处理的 SetData()函数看,Set 数值后,会触发 watch 回调,即 triggerWatch()。
服务端 SetData()
服务端 triggerWatch ()
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。