【原创】大叔问题定位分享(40)kafka reassign卡住

kafka reassign过程详见:reassign过程

最近kafka集群发生reassign过程卡住的情况,问题发生过程如下
问题日志

2021-07-16 10:35:41,193 INFO kafka.controller.KafkaController: [Controller id=3] 0/2 replicas have caught up with the leader for partition kafka-9 being reassigned. Replicas) 3,2 still need to catch up

对应代码逻辑

  case class PartitionReassignmentIsrChangepartition: TopicPartition) extends ControllerEvent {
    override def state: ControllerState = ControllerState.PartitionReassignment

    override def process): Unit = {
      if !isActive) return
      // check if this partition is still being reassigned or not
      controllerContext.partitionsBeingReassigned.getpartition).foreach { reassignedPartitionContext =>
        val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
        zkClient.getTopicPartitionStatesSeqpartition)).getpartition) match {
          case SomeleaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
            val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
            val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
            if caughtUpReplicas == reassignedReplicas) {
              // resume the partition reassignment process
              infos"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
                s"partition $partition being reassigned. Resuming partition reassignment")
              onPartitionReassignmentpartition, reassignedPartitionContext)
            }
            else {
              infos"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
                s"partition $partition being reassigned. Replicas) " +
                s"${reassignedReplicas -- leaderAndIsr.isr.toSet).mkString",")} still need to catch up")
            }
          case None => errors"Error handling reassignment of partition $partition to replicas " +
                         s"${reassignedReplicas.mkString",")} as it was never created")
        }
      }
    }
  }

因为此时replica计划的节点不在isr里,只能等待,问题是一直在等待

等待的broker里的日志如下:

2021-07-16 10:35:41,150 WARN state.change.logger: [Broker id=3] Ignoring LeaderAndIsr request from controller 3 with correlation id 969 epoch 44 for partition kafka-9 as the local replica for the partition is in an offline log directory

对应代码

        leaderAndIsrRequest.partitionStates.asScala.foreach { case topicPartition, stateInfo) =>
          val partition = getOrCreatePartitiontopicPartition)
          val partitionLeaderEpoch = partition.getLeaderEpoch
          if partition eq ReplicaManager.OfflinePartition) {
            stateChangeLogger.warns"Ignoring LeaderAndIsr request from " +
              s"controller $controllerId with correlation id $correlationId " +
              s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
              "partition is in an offline log directory")
            responseMap.puttopicPartition, Errors.KAFKA_STORAGE_ERROR)
          }

因为partition为offline状态,LeaderAndIsr请求实际不会执行,所以不会加入isr,然后reassign就一直等待

那partition为什么会offline,继续跟进日志发现

2021-07-16 00:25:17,836 INFO kafka.log.LogManager: Stopping serving logs in dir /data/kafka/data

而且当时有大量的磁盘满报错

java.io.IOException: No space left on device

因为之前发生过磁盘满,所以kafka将这个目录标记为offline,所有在这个目录里的partition都会被标记offline,LeaderAnsIsr请求不会执行

可以从controller里搜索 ‘still need to catch up’ 看看有哪些broker一直在等待,然后到这些服务器上看启动之后是否有 ‘Stopping serving logs’ 日志,如果有就会卡住,解决方法就是将等待的broker重启,然后reassign就可以开始

—————————————————————- 结束啦,我是大魔王先生的分割线 :) —————————————————————-

由于大魔王先生能力有限,文中可能存在错误,欢迎指正、补充!
感谢您的阅读,如果文章对您有用,那么请为大魔王先生轻轻点个赞,ありがとう

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注