部门的开发同学最近在开发一个活动的过程中,需要关注大量的应用后台逻辑,捕捉各种事件的触发。在设计时打算采用Kafka消息队列进行业务逻辑的解耦,这样活动开发和后台开发同学的工作就分离开了。但是使用的同学不是很熟悉其原理,担心以下几个问题:这些问题都很正常,在开始接触和使用时总会有这样或那样的问题。一般情况下,不做了解,使用各种默认的推荐值,也是可以work的。但是我们要优雅的提升自己的姿(知)势(识)。学习其背后的原理,至少在遇到一般的问题时,能够分析和处理问题,做到心中有数。简单来说,有3个关键词, 异步、削峰、解耦。可以理解为:我做完了,后面的我不管了;
工作太多了,先放一放我慢慢处理;
怎么产生的我不管/怎么处理我不管。
用户提交评论中, 写入数据库后,存在需要捕捉评论事件的多个逻辑步骤。如果在接口处理过程中,顺序的处理不同的步骤,非常繁琐。我们可以批量的通知各个步骤(异步),无需返回直接处理当次的支付其他逻辑(解耦)。看起来就清爽多了。另外,消息队列也可以作为缓存暂存发出的消息,不再需要考虑调用各个步骤时时延逻辑的异常场景。附注:本文以讲解Kafka中的可靠性设计为例,其它消息队列的选型暂不涉及。在回答文章前面的问题之前,需要简单介绍一下各种概念。 Kafka从拓扑上分有如下角色:Consumer: 消费者,一般以API形式存在于各个业务svr中;
Producer: 生产者,一般以API形式存在于各个业务svr中;
Kafka broker: Kafka集群中的服务器,topic里的消息数据存在上面。
Producer采用发送push的方式将消息发到broker上,broker存储后。由consumer采用pull模式订阅并消费消息。
Topic:Kafka处理的消息的逻辑大类集合,可以理解为表。写入不同的topic即写入不同的表。
Partition: Topic下的物理分组,1个topic可以分为多个partition, 每个partition是一个有序的队列(大文件)。Partition中每一条消息都有一个有序的offset。
Msg: 消息,通信的基本单位。每个msg在topic下的不同partiton仅有一份,在partition中有一个唯一的offset用于定位。
Replica: 副本,partition的数据冗余备份,用于实现分布式的数据可靠性,但引入了不同副本间的数据一致性问题,带来了一定的复杂度。
Leader/follower: replica的角色,leader replica 用来提供该partition的读写服务。Follower 不停的从leader侧同步写入的消息。它们之间的消息状态采用一致性策略来解决。
为了方便后文更好的理解broker上的消息状态一致性策略,需要再简单介绍一下消息的存储格式。当Producer 发送一条消息到broker中, 会根据分配 partition 规则选择被存储到哪一个 partition, 如果 partition 规则设置的合理,消息会均匀的分布到不同的 partition 里,这样就实现了水平扩展。Pruducer可以认为partition是一个大的串行文件,msg存储时被分配一个唯一的offset。Offset是一个逻辑意义上的偏移,用于区分每一条消息。而partition本身作为文件,可以有多个多个副本replica(leader/follower)。多个replica分布在在不同的broker上。如果要回答如何在broker之间保证存储的消息和状态不会丢失,就要回答broker之间的各个replica的消息状态一致性如何解决,包括producer已经提交了哪些消息,哪些消息已经落地,哪些消息在节点故障后不会丢失。回到文章开头提到的几个问题,在使用Kafka消息队列做异步发送时,如何保证消息的可靠性?这里可以分为3个部分讲解。1. 生产者的可靠性保证
发消息之后有没有ack?
发消息收到ack后,是不是消息就不会丢失了?
而Kafka通过配置来指定producer生产者在发送消息时的ack策略:如果想实现Kafka配置为 CP(Consistency & Partition tolerance) 系统, 配置需要如下:
request.required.acks=-1
min.insync.replicas = ${N/2 + 1}
unclean.leader.election.enable = false
如图所示,在acks=-1 的情况下,新消息只有被ISR中的所有 follower(f1和f2, f3) 都从leader复制过去才会回ack, ack后,无论那种机器故障情况(全部或部分), 写入的msg4,都不会丢失, 消息状态满足一致性C 要求。
正常情况下,所有follower复制完成后,leader回producer ack。异常情况下,如果当数据发送到 leader后部分副本(f1和f2同步), leader挂了?此时任何 follower 都有可能变成新的 leader, producer 端会得到返回异常,producer 端会重新发送数据,但这样数据可能会重复(但不会丢失), 暂不考虑数据重复的情况。min.insync.replicas 参数用于保证当前集群中处于正常同步状态的副本follower数量,当实际值小于配置值时,集群停止服务。如果配置为 N/2+1, 即多一半的数量,则在满足此条件下,通过算法保证强一致性。当不满足配置数时,牺牲可用性即停服。异常情况下,leader挂掉,此时需要重新从follower选举leader。可以为f2或者f3。如果选举f3为新leader, 则可能会发生消息截断,因为f3还未同步msg4的数据。Kafka通过unclean.leader.election.enable来控制在这种情况下,是否可以选举f3为leader。旧版本中默认为true,在某个版本下已默认为false,避免这种情况下消息截断的出现。通过ack和min.insync.replicas和unclean.leader.election.enable的配合,保证在Kafka配置为CP系统时,要么不工作,要么得到ack后,消息不会丢失且消息状态一致。min.insync.replicas 参数默认值为1,即满足高可用性,只要有1台能工作即可。但此时可工作的broker状态不一定正确。如果想实现Kafka配置为AP(Availability & Partition tolerance)系统:
request.required.acks=1
min.insync.replicas = 1
unclean.leader.election.enable = false
当配置为acks=1 时,即leader接收消息后回ack,这时会出现消息丢失的问题:如果 leader接受到了 第4 条消息,此时还没有同步到 follower中,leader机器挂了,其中一个follower被选为 leader, 则 第 4 条消息丢失了。当然这个也需要unclean.leader.election.enable参数配置为false来配合。但是leader回ack的情况下,follower未同步的概率会大大提升。通过producer策略的配置和Kafka集群通用参数的配置,可以针对自己的业务系统特点来进行合理的参数配置,在通讯性能和消息可靠性下寻得某种平衡。2. Broker的可靠性保证
消息通过producer发送到broker之后,还会遇到很多问题:Partition leader 写入成功,follower什么时候同步?
Leader写入成功,消费者什么时候能读到这条消息?
Leader写入成功后,leader重启,重启后消息状态还正常嘛?
Leader重启,如何选举新的leader?
这些问题集中在:消息落到broker后,集群通过何种机制来保证不同副本间的消息状态一致性。3. Kafka消息备份和同步
Kafka通过分区的多副本策略来解决消息的备份问题。通过HW和LEO的标识,来对应ISR和OSR的概念,用于类比共识性算法解决数据同步一致性的问题。分区多副本即前文提到的Partition 的replica(副本) 分布在跟 partition 不相同的机器上, 通过数据冗余保证故障自动转移。而不同副本的状态形成了ISR和OSR的概念。ISR : leader 副本保持一定同步的 follower 副本, 包括 leader 副本自己,叫 In Sync Replica;
AR: 所有副本 (replicas) 统称为 assigned replicas, 即 AR;
OSR: follower 同 leader 同步数据有一些延迟的节点。
ISR是Kafka的同步策略中独有的概念,区别于raft等共识性算法。Raft要求集群中要求N/2+1台正常,其在这种条件下通过复杂的算法保证选举出的新leader符合一致性状态。而kafka的ISR同步策略,通过ISR列表的可伸缩性和HW&LEO更新,一定程度上解决了消息一致性和吞吐性能之间的平衡。 HW: Highwatermark, 俗称高水位,它表示了一个特定的消息偏移量(offset), 在一个parttion中consumer只能拉取这个 offset 之前的消息(此 offset 跟 consumer offset 不是一个概念) ;
LEO: LogEndOffset, 日志末端偏移量, 用来表示当前日志文件中下一条写入消息的offset;
leader HW: 该Partititon所有副本的LEO最小值;
follower HW: min(follower自身LEO 和 leader HW);
Leader HW = 所有副本LEO最小值;
- Follower HW = min(follower 自身 LEO 和 leader HW);
Leader不仅保存了自己的HW & LEO,还保存了远端副本的HW & LEO。简单来说,每个副本都有HW和LEO的存储,而leader不但保存自己的HW和LEO, 还保存了每个远端副本的LEO,用于在自身的HW更新时计算值。可以看出由于LEO远端存储的特性,其实会导致副本真实的LEO和leader存储的LEO有短暂的数值差异,这会带来一些问题,下文也会展开讲述。 | Follower从leader副本拉取消息,写入磁盘后,更新LEO值 |
| Leader收到producer消息,写入磁盘后,更新LEO值 |
| Follower fech时带上自己的LEO, leader使用这个值更新远程LEO |
| followerfetch成功更新LEO后,比较leader发来的hw和自己的hw,取较小值 |
| Leader更新LEO之后,更新完远程LEO之后,取所有副本的最小LEO |
一次完整的写请求的HW / LEO更新流程如下图所示:Leader 所有的 HW&LEO都为0, follower 与 leader 建立连接,follower fetch leader, follower 所有 HW & LEO 都为0Producer 发来一条消息到 leader, 此时 leader 的 LEO=1, follower 带着自己的 HW&LEO(都为0) 开始 fetch, leader的 HW=min(all follower LEO)=0, leader 记录follower的LEO=0;follower 拉取到一条消息,带着消息和leader的 HW(0)&LEO(1)返回自身更新自己的LEO=1, 更新自己的HW=min(follower 自身 LEO(1) 和 leader HW(0))=0Follower带着自己的 HW(0)&LEO(1) 去请求leader .此时leader 的HW更新为1,leader 保存的follower的 LEO更新为1,带着leader 的 HW(1)&LEO(1)返回自身,更新自己的 HW&LEO此时回到刚才提到的问题,这种HW和LEO更新策略有个很明显的问题,即follower的HW更新需要follower的2轮fetch中的leader返回才能更新,而Leader的HW已更新。在这之间,如果follower和leader的节点发生故障,则follower的HW和leader的HW会处于不一致状态,带来比较多的一致性问题。比如如下场景:Leader更新完分区HW后,follower HW还未更新,此时follower重启;
Follower重启后,LEO设置为之前的follower HW值(0), 此时发生消息截断(临时状态);
Follower重新同步leader, 此时leader宕机,则不选举则不可用;
Follower被选举为leader, 则msg 1 永久丢失了。
在Kafka配置为AP系统的情况下,由于min.insync.replicas为1,这种重启后follower发生截断发生的概率会大大提升, 而在多个副本存在的情况下,情况可能还会更加糟糕。而kafka新版本为了解决这个HW&LEO的同步机制更新缺陷,引入了Epoch的概念。Leader epoch(1, 120) 说明这个leader 的版本号为1,版本的起始位置是 第120条消息开始的Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。Kafka通过ISR的同步机制及优化策略,用 HW & LEO的方式很好的确保了数据不丢失以及吞吐率。而ISR的管理最终都会反馈到Zookeeper上,其实现和leader的选举策略不再赘述。
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。