最全Kafka知识宝典之数据可靠性深度剖析
一、生产者保证消息可靠性
Kafka有两个很重要的配置参数,acks和min.insync.replicas,其中acks是producer的配置参数,min.insync.replicas是Broker端的配置参数,这两个参数对于生产者不丢失数据起到了很大的作用
在正式讲解之前,我们要先聊一聊分区当中的副本
在kafka中副本是有一个leader节点和多个follower节点组成,leader节点负责接收消息和消费消息
follower既不提供写服务也不提供读服务,仅仅用于同步leader副本的消息,follower副本的唯一作用就是当leader副本出现问题时,通过ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。
副本角色
副本主要由三种角色组成,leader有单独的线程定期检查|SR中follower是否脱离ISR,如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中
- AR(assignedreplica):所有副本的统称,AR=ISR+OSR
- ISR(in·sync Replica):同步中的副本,可以参与leader选主,一旦落后太多会被踢到OSR,默认落后10s后就会踢到OSR中。replica.lag.time.max.ms可以指定一个时间值,默认是10000ms。这个时间后就踢出ISR
- OSR(Out-SyncRelipcas):踢出同步的副本,一直追赶leader,追上后会进入ISR
如何检测落后太多?
1、心跳检测:
- Kafka 会定期检查每个副本的心跳,确保副本与 Leader 保持连接。
- 如果某个副本在
replica.lag.time.max.ms
时间内没有向 Leader 发送心跳或获取到新的消息,就会被认为落后太多。
2、日志同步:
- Kafka 会定期检查每个副本的日志同步情况。
- 如果某个副本在
replica.lag.time.max.ms
时间内没有同步到 Leader 的最新日志,也会被认为落后太多。
ACKS确认机制
acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的,这个参数对于消息是否可能丢失起着非常重要作用,配置acks,我们可以通过配置acks=【0、1、a11】来实现。
acks=0
:生产者不会等待任何确认,消息可能丢失。acks=1
:生产者只等待leader确认,但不等待所有副本确认。acks=all
:生产者等待所有副本确认(这里所有指的是ISR中的副本),确保消息不会丢失。
面试题:如果一个节点挂掉,就会送ISR中踢出,但是acks=all又是针对ISR集合的。那如果所有的副本全部挂掉并被移除ISR集合,只剩下一个leader副本,那么acks=all和acks=1还有什么区别呢?
回答:在这种情况下,acks=all
和 acks=1
的行为实际上是相同的。因为 ISR 集合中只有一个 Leader 副本,所以无论是 acks=all
还是 acks=1
,生产者都只需要等待 Leader 副本的确认。然而,这种情况下,如果 Leader 副本在确认消息后立即挂掉,消息可能会丢失,因为没有其他副本可以恢复。
所以一旦Leader节点宕机,那么数据是无法恢复的,这样会给系统的数据一致性问题造成很大的影响,不可以保证系统的一致性。
面试官追问:既然在这里acks=all和acks=1效果一样,那你如何解决这个问题呢?
回答:在Kafka里有一个最小同步副本机制可以解决,min.insync.replicas
最小同步副本min.insync.replicas机制
这个参数定义了在ISR集合中,必须有多少个副本确认接收到消息,生产者才能认为消息发送成功。所以能很好地解决只有Leader一个副本存在于ISR的情况
通过命令行设置这个参数
kafka-configs.sh --alter --topic my-topic --add-config min.insync.replicas=2 --bootstrap-server localhost:9092
有两个问题
1、如果设定的acks=1,但是 min.insync.replicas设定超过1,比如3,那么以acks为准,即只需一台副本写入成功就为成功,所以这两个值在设定的时候要注意
2、这个值设定超过1的话消费者端会报错,因为这个设定是整个broker的设定,消费者端有一个topic是__consumer_offsets,为了避免报错,可以设定这个topic的副本数量大于等于最小同步副本数量,即KAFKA_OFFSETS_TOPIC_PEPLICATION_FACTOR=3
生产者重试
当生产者收到服务端的错误之后,这个错误可能是暂时的(比如分区缺少leader),在这种情况下,通过设置retries参数的值将控制生产者在放弃发送消息自强进行重试的次数。
默认情况下,生产者将等待重试的时间间隔为100ms,但是你可以使用retry.backoff.ms参数来控制重试的时间,默认重试次数为Integer.MAX_VALUE。
建议对broker的选举恢复时间进行测试。并设置重试次数和重试间隔时间,使重试花费的总时间大于kafka集群的故障恢复时间,否则生产者可能过早放弃消息
并不是所有的错误都能够进行重试,有些错误不是暂时性的,此类错误不建议重试(如消息太大的错误),通常由于生产者为你处理重试,所以在你的应用程序逻辑中自定义重试将没用任何意义,你最好是将精力放在处理不可重试的错误或者失败的情况上面。
二、副本同步机制
首先要了解三个基本概念,LEO、Remote LEO、HW,其实这三个就把他理解为指针的概念
LEO
LEO 表示当前日志文件中下一条待写入的消息的偏移量(offset)。换句话说,它是当前分区中最新消息的偏移量加 1。通俗理解为,指向了我期待收到的下一条消息的序号
Remote LEO
Remote LEO 是 Leader 副本上存储的 Follower 副本的 LEO 值。它表示 Follower 副本当前日志的末端位置。
HW
HW 是一个重要的复制进度指标,表示分区中的消息复制进度。它表示在分区中的消息日志中,已经被成功复制到所有 ISR中的消息的位置。通俗理解就是如果现在要消费消息,哪些消息是可读的,这就是一个可读指针。
HW 是 ISR 中最小的 LEO 值。这意味着 HW 表示 ISR 中所有副本都已经复制的消息的偏移量。举个例子,当前ISR中有1个leader,2个follower,现在leader的leo值为8,但是remote leo 1为8,表示follower1已经同步好了,remote leo 2值为4,表示follower2只同步到了4,所以对于整个ISR,可读的消息只有4之前的,4之后的消息还没同步完全(follower2还没有同步)
Leader 副本通过比较 Remote LEO 和自己的 LEO 来判断 Follower 副本的同步状态。
HW 是 ISR 中最小的 LEO 值,因此 Leader 副本会根据 ISR 中所有副本的 Remote LEO 来更新 HW。
LEO更新时机
- leader 副本自身的 LEO 值更新:在 Producer 消息发送过来时,即 leader 副本当前最新存储的消息位移位置 +1;
- follower 副本自身的LEO 值更新:从 leader 副本中 fetch 到消息并写到本地日志文件时,即 follower 副本当前同步 leader副本最新的消息位移位置 +1;
- leader 副本中的 remote LEO值更新:每次 follower副本发送 fetch 请求都会包含 follower当前LEO值,leader拿到该值就会尝试更新 remote LEO值。
HW更新时机
故障时更新
- 副本被选为 leader 副本时:当某个 follower 副本被选为分区的 leader 副本时,kafka 就会尝试更新 HW 值;
- 副本被踢出 SR 时:如果某个副本追不上leader 副本进度,或者所在 broker 崩溃了,导致被踢出 ISR,leader 也会检查 HW值是否需要更新,毕竟 HW 值更新只跟处于 ISR 的副本 LEO 有关系。在最新的ISR中选出LEO最小值
正常时更新
- producer 向leader 副本写入消息时:在消息写入时会更新leader LEO值,因此需要再检査是否需要更新 HW 值;
- leader 处理 follower fetch请求时:follower 的 fetch 请求会携带 LEO 值,leader 会根据这个值更新对应的 remote LEO值,同时也需要检查是否需要更新 HW 值
follower HW 更新
- follower 更新 HW 发生在其更新 LEO 之后,每次 follower fetch 响应体都会包含 leader 的 HW 值,然后比较当前 LEO 值取最小的作为新的 HW 值。
副本同步过程模拟
初始状态
leader 和 follower 副本处于初始化值,follower 副本发送 fetch 请求,由于leader 副本没有数据,因此不会进行同步操作
写入消息
生产者发送了消息m1到分区leader副本,写入该条消息后leader更新LEO = 1
同步消息
follower 发送 fetch 请求,槜带当前最新的 offset=0,leader 处理 fetch 请求时,更新 remote LEO=0,对比 LEO 值最小为0,所以HW=0
leader 副本响应消息数据及 leader HW=0给 follower,follower 写入消息后,更新LEO 值,同时对比 leader HW 值,取最小的作为新的 HW 值,此时 follower HW=0,这也意味着,follower HW 是不会超过 leader HW 值的。
最后同步
follower 发送第二轮 fetch 请求,携带当前最新的 offset=1,leader 处理fetch 请求时,更新 remote LEO=1,对比 LEO值最小为 1,所以 HW=1,
此时 leader 没有新的消息数据,所以直接返回 leader HW=1给 follower,follower 对比当前最新的 LEO 值与 leader HW值,取最小的作为新的HW值,此时follower Hw=1
副本同步过程存在的问题
数据丢失
第二阶段同步
当原来的主节点恢复后会变成从节点,然后从Leader节点同步数据,但是因为新选出来的Leader的LEO是1,从Leader同步消息的时候为了保证消息一致性会截断Follower中的消息
数据不一致
初始情况
假如初始情况下我们已经有了三条数据现在就要进行数据同步,leader已经完成了HW的更新但是还没有同步到Follower 中
服务器宕机
这个时候不是一台服务器宕机,而是全部宕机
follower节点恢复
接下来节点恢复的时候假如因为某些原因Follower 节点先恢复了,这个时候follower 会选为leader ,原来的1eader 会便为 follower,并且恢复后为了数据一致性会进行数据截断
数据写入
当集群有了 Leader 节点后这个时候数据就可以进行写入了,我们在Leader 节点写入了 m4
在只有一个节点恢复的情况下,ISR列表只有Leader,所以数据是可以写入成功的
三、消息投递语义
最多一次(At-Most-Once)
- 定义:消息可能丢失,但不会重复。
- 实现方式:
- 生产者设置
acks=0
,即生产者不等待任何确认直接发送下一条消息。 - 消费者在读取消息后立即提交偏移量(Offset),而不等待消息处理完成。
- 生产者设置
- 适用场景:
- 对消息丢失容忍度较高的场景,如日志收集、监控数据等。
至少一次(At-Least-Once)
- 定义:消息不会丢失,但可能会重复。
- 实现方式:
- 生产者设置
acks=all
,即生产者等待所有 ISR 副本确认后再发送下一条消息。 - 消费者在消息处理完成后才提交偏移量。
- 生产者设置
- 适用场景:
- 对消息丢失容忍度较低,但可以接受消息重复的场景,如计数器更新、订单处理等。
精确一次(Exactly-Once)
- 定义:消息既不会丢失也不会重复,每条消息只会被处理一次。
- 实现方式:
- 生产者设置
acks=all
,并启用幂等性(enable.idempotence=true)。 - 消费者使用事务(Transactions)和读写分离(Read-Committed)模式。
- 生产者设置
- 适用场景:
- 对消息投递严格要求的场景,如金融交易、关键业务流程等。