阿里 P7 三面,kafka Borker 日志持久化没答上

👏作者简介:大家好,我是爱敲代码的小黄,阿里巴巴淘天Java开发工程师

📕系列专栏:Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列

🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦

🍂博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人

阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来

一、引言

前段时间有个朋友,去面了阿里集团的P7岗位,很遗憾的是三面没有过

其中有一个 kafkaBorker 日志如何持久化的问题没有答上来

今天正好写一篇源码文章给朋友复盘一下

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、日志原理介绍

在讲 Kafka 日志源码之前,我们要先对 Kafka 日志有一个大体的认识

这也是阅读源码的关键,一步一步来

前面我们聊到了 Kafka 的生产端的整体架构

可以看到,我们每一个 Topic 都可以分为多个 Partition ,而每一个 Partition 对应着一个 Log

但这里会存在两个问题,如果我们的数据过大

  • 一个 Log 能装下吗?
  • 就算能装下,插入/查询速度怎么保证?

所以,Kafka 在这里引入了日志分段(LogSegment )的概念,将一个 Log 切割成多个 LogSegment 进行存储

实际上,这里的 LogLogSegment 并不是纯粹的物理意义上的概念

  • Log 对应的文件夹
  • LogSegment 对应磁盘上的一个日志文件和两个索引文件 日志文件:以 .log 为文件后缀两个索引文件: 偏移量索引文件(以 .index为文件后缀)时间戳索引文件(以.timeindex为文件后缀)

这里有个重点要记一下:每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment 第一条消息的 offset

日志和索引文件命名都是按照基准偏移量进行命名,所以日志整体架构如下:

这里我们简单介绍下这个日志是怎么搜索的,后面会深入源码细聊

二、日志源码

我们回顾一下上篇文章的整体流程图:

我们可以看到,消息的处理是通过 KafkaApis 来进行的,日志持久化通过 case ApiKeys.PRODUCE => handleProduceRequest(request)

本篇我们也围绕这个方法展开

1、授权校验

def handleProduceRequest(request: RequestChannel.Request) {

  // authorizedRequestInfo:存储通过授权验证的主题分区和对应的内存记录。
  val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
  for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
      if (!authorize(request.session, Write, Resource(Topic, topicPartition.topic, LITERAL)))
    		// 未授权的
        unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
      else if (!metadataCache.contains(topicPartition))
        nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
      else
        try {
          // 授权的
          ProduceRequest.validateRecords(request.header.apiVersion(), memoryRecords)
          authorizedRequestInfo += (topicPartition -> memoryRecords)
        } catch {
          case e: ApiException =>
            invalidRequestResponses += topicPartition -> new PartitionResponse(Errors.forException(e))
        }
    }
}

2、消息添加

  • 【重点】timeout:超时时间
  • 【重点】requiredAcks:指定了在记录追加到副本后需要多少个副本进行确认,才认为写操作成功 0: 不需要任何副本的确认1: 只需要主副本确认-1 或 all: 需要所有副本的确认
  • internalTopicsAllowed:是否允许将记录追加到内部主题
  • isFromClient:请求是否来自客户端
  • 【重点】entriesPerPartition:包含了通过授权验证的主题分区和对应的内存记录
  • responseCallback:回调函数,在记录追加完成后,会调用该回调函数发送响应给客户端。
  • recordConversionStatsCallback:处理记录转换统计信息的逻辑
replicaManager.appendRecords(
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        isFromClient = true,
        entriesPerPartition = authorizedRequestInfo,
        responseCallback = sendResponseCallback,
        recordConversionStatsCallback = processingStatsCallback)

我们主要关心这三个参数即可:timeoutrequiredAcksentriesPerPartition,其余的目前不太重要

def appendRecords(timeout: Long,
                  requiredAcks: Short,
                  internalTopicsAllowed: Boolean,
                  isFromClient: Boolean,
                  entriesPerPartition: Map[TopicPartition, MemoryRecords],
                  responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                  delayedProduceLock: Option[Lock] = None,
                  recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
   // 校验当前的ACK
   if (isValidRequiredAcks(requiredAcks)) {
      // 记录起始时间
      val sTime = time.milliseconds
      // 追加本地日志
      val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        isFromClient = isFromClient, entriesPerPartition, requiredAcks)
   }
}

// 允许当前的ACK为1、0、-1
private def isValidRequiredAcks(requiredAcks: Short): Boolean = {
  requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0
}

这里的追加本地日志就是我们本篇的重点

2.1 获取 Partition

private def appendToLocalLog(internalTopicsAllowed: Boolean,
                             isFromClient: Boolean,
                             entriesPerPartition: Map[TopicPartition, MemoryRecords],
                             requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
  val partition = getPartitionOrException(topicPartition, expectLeader = true)
}

// 根据给定的主题分区获取对应的分区对象
def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = {
   	// 获取Partition并匹配
    getPartition(topicPartition) match {
      case Some(partition) =>
        if (partition eq ReplicaManager.OfflinePartition)
          throw new KafkaStorageException()
        else
          partition
      case None if metadataCache.contains(topicPartition) =>
        if (expectLeader) {
          throw new NotLeaderForPartitionException()
        } else {
          throw new ReplicaNotAvailableException()
        }
    }
  }

2.2 向 Leader 追加日志

val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks);

def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
     val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient,
            interBrokerProtocolVersion)
}

def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
                     interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
    append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
  }

2.2.1 是否创建 segment

这里就到了我们一开始图中的 LogSegment

 val segment = maybeRoll(validRecords.sizeInBytes, appendInfo);

 private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): LogSegment = {
   	// 如果应该滚动,创建一个新的segment
    // 反之,则返回当前的segment
    if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) {
      appendInfo.firstOffset match {
        case Some(firstOffset) => roll(Some(firstOffset))
        case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE))
      }
    } else {
      segment
    }
 }

一共有六个条件,触发这六个条件,就会重新创建一个 segment

  • timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs :判断时间等待是不是超时
  • size > rollParams.maxSegmentBytes - rollParams.messagesSize:当前 segment 是否有充足的空间存储当前信息
  • size > 0 && reachedRollMs :当前日志段的大小大于0,并且达到了进行日志分段的时间条件reachedRollMs
  • offsetIndex.isFull :偏移索引满了
  • timeIndex.isFull:时间戳索引满了
  • !canConvertToRelativeOffset(rollParams.maxOffsetInMessages):无法进行相对偏移的转换操作
class LogSegment private[log] (val log: FileRecords,
                               val offsetIndex: OffsetIndex,
                               val timeIndex: TimeIndex,
                               val txnIndex: TransactionIndex,
                               val baseOffset: Long,
                               val indexIntervalBytes: Int,
                               val rollJitterMs: Long,
                               val time: Time) extends Logging {

  def shouldRoll(rollParams: RollParams): Boolean = {
    val reachedRollMs = 
    timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) >    rollParams.maxSegmentMs - rollJitterMs
    size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
      (size > 0 && reachedRollMs) ||
      offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
}

整体来看,六个条件也比较简单,我们继续往后看

2.2.2 创建 segment

appendInfo.firstOffset match {
  // 存在第一个偏移量
  case Some(firstOffset) => roll(Some(firstOffset))
  // 不存在第一个偏移量
  case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE))
}

2.2.2.1 文件路径校验
def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
  
  // 获取最新的offset
  val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset)
  // 获取日志文件路径
  val logFile = Log.logFile(dir, newOffset)
  // 获取偏移量索引文件路径
  val offsetIdxFile = offsetIndexFile(dir, newOffset)
  // 获取时间戳索引文件路径
  val timeIdxFile = timeIndexFile(dir, newOffset)
  // 获取事务索引文件路径
  val txnIdxFile = transactionIndexFile(dir, newOffset)
  
  // 对路径列表进行遍历,如果文件存在,则将其删除。
  for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
    Files.delete(file.toPath)
  }
}

2.2.2.2 segment 参数
  • dir:日志段所在的目录
  • baseOffset:日志段的基准偏移量
  • config:日志的配置信息
  • time:时间对象,用于处理时间相关的操作。
  • fileAlreadyExists:指示日志文件是否已经存在
  • initFileSize:初始文件大小
  • preallocate:是否预分配文件空间
  • fileSuffix:文件后缀
val segment = LogSegment.open(dir,
  baseOffset = newOffset,
  config,
  time = time,
  fileAlreadyExists = false,
  initFileSize = initFileSize,
  preallocate = config.preallocate)

2.2.2.3 生成 segment
new LogSegment(
  // 生成日志文件
  FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
  // 生成偏移量索引
  new OffsetIndex(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
  // 生成时间戳索引
  new TimeIndex(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
  // 生成事务索引
  new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)),
  // 基准偏移量
  baseOffset,
  indexIntervalBytes = config.indexInterval,
  rollJitterMs = config.randomSegmentJitter,
  time)

这里有一个重点需要关注一下,那就是 mmap 的零拷贝

OffsetIndexTimeIndex 他们继承 AbstractIndex ,而 AbstractIndex 中使用 mmp 作为 buffer

class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) 
    

abstract class AbstractIndex{
   protected var mmap: MappedByteBuffer = {};
}

另外,这里先提一个知识点,后面会专门写一篇文章来分析一下

我们索引在查询的时候,采用的是二分查找的方式,这会导致 缺页中断,于是 kafka 将二分查找进行改进,将索引区分为 冷区 和 热区,分别搜索,尽可能保证热区的页在 Page Cache 里面,从而避免缺页中断。

当我们的 segment 生成完之后,就返回了

2.2.3 向 segment 添加日志

segment.append(largestOffset = appendInfo.lastOffset,
          largestTimestamp = appendInfo.maxTimestamp,
          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
          records = validRecords)

def append(largestOffset: Long,
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = {
  if (records.sizeInBytes > 0) {
    	// 添加日志
      val appendedBytes = log.append(records)
  }
  // 当累加超过多少时,才会进行索引的写入
  // indexIntervalBytes 默认 1048576 字节(1MB)
  if (bytesSinceLastIndexEntry > indexIntervalBytes) {
    // 添加偏移量索引
    offsetIndex.append(largestOffset, physicalPosition)
    // 添加时间戳索引
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
    // 归0
    bytesSinceLastIndexEntry = 0
  }
  // 累加
  bytesSinceLastIndexEntry += records.sizeInBytes
}

// lastOffset + 1
updateLogEndOffset(appendInfo.lastOffset + 1)

2.2.3.1 稀疏索引

kafka 中的偏移量索引和时间戳索引都属于稀疏索引

何为稀疏索引?

正常来说,我们会为每一个日志都创建一个索引,比如:

日志  索引
1     1
2     2
3     3
4     4

但这种方式比较浪费,于是采用稀疏索引,如下:

日志  索引
1     1
2			
3			
4			
5     2
6
7
8

当我们根据偏移量索引查询 1 时,可以查询到日志为 1 的,然后往下遍历搜索想要的即可。

2.2.3.2 偏移量索引
offsetIndex.append(largestOffset, physicalPosition)
 
def append(offset: Long, position: Int) {
  inLock(lock) {
    // 索引位置
    mmap.putInt(relativeOffset(offset))
    // 日志位置
    mmap.putInt(position)
    _entries += 1
    _lastOffset = offset
  }
}

// 用当前offset减去基准offset
def relativeOffset(offset: Long): Int = {
  val relativeOffset = offset - baseOffset
}

2.2.3.3 时间戳索引
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)

def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) {
    inLock(lock) {
      if (timestamp > lastEntry.timestamp) {
        // 添加时间戳
        mmap.putLong(timestamp)
        // 添加相对位移(偏移量索引)
        mmap.putInt(relativeOffset(offset))
        _entries += 1
        _lastEntry = TimestampOffset(timestamp, offset)
      }
    }
}

2.2.3.4 索引总结

我们的偏移量索引如图下所示:

  • 当我们查询一个消息时,比如消息位移为 23 的 根据二分查找找到偏移量索引下标 22利用上述我们偏移量 Map 的存储,得到其日志位置 RecordBatch:firstOffset=23 position=762再根据日志位置,找到真正存储日志的地方

我们的时间戳索引如图下所示:

  • 基本和我们的偏移量索引类似,只是增加了一层二分查找

2.2.4 flush刷新

在我们前面添加完之后,我们的数据仅仅是写到 PageCache 里面,需要进行 flush 将其刷新到磁盘中

// 未刷新消息数(unflushedMessages)超过配置的刷新间隔(flushInterval)
if (unflushedMessages >= config.flushInterval){
  flush()
}

def flush() {
    LogFlushStats.logFlushTimer.time {
      // 日志刷新
      log.flush()
      // 偏移量索引刷新
      offsetIndex.flush()
      // 时间戳索引刷新
      timeIndex.flush()
      // 事务索引刷新
      txnIndex.flush()
    }
  }

2.3 Follow 获取日志

同样,我们的 Follow 在获取日志时,和我们 Leader 添加日志时一样的方法

三、流程图

四、总结

这一篇我们介绍了 Kafka 中日志时如何持久化的以及 Kafka 日志中包括什么数据

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

我是爱敲代码的小黄,阿里巴巴淘天集团Java开发工程师,双非二本,培训班出身

通过两年努力,成功拿下阿里、百度、美团、滴滴等大厂,想通过自己的事迹告诉大家,努力是会有收获的!

双非本两年经验,我是如何拿下阿里、百度、美团、滴滴、快手、拼多多等大厂offer的?

我们下期再见。

从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。

#面试##Java##校招##社招##kafka#
全部评论

相关推荐

03-31 21:47
东南大学 C++
彭于晏前来求offe...:吓晕了
点赞 评论 收藏
分享
04-24 13:51
已编辑
西安电子科技大学 Java
👋个人背景:211计算机混子,代码能力一般,春招急头白脸参加央国企最后拿下这两个offer👏offer1:中广核工程公司驻陆丰仪控调试,待遇19+4,离家1800km💯offer2:张家口卷烟厂待遇未知,应该有13个(猜测),离家500km牛油们帮忙选一下,家里人不是很喜欢卷烟厂这个offer,但是蜀黍烟草局下岸了
鸿雁于飞:先说offer1:中广核工程公司驻陆丰仪控调试(待遇19+4) 中广核这艘央企大船还是很稳的,集团综合效益稳居央企前列。但你得搞清楚,这个19+4的"19"是总包,不是到手数——招聘宣传待遇里把所有能算的都算进去了,饭卡福利积分啥的全包含,有牛油分享实际到手大概打七折。试用期到手可能就四五千的水平,转正后基本工资4800左右,其余靠绩效、年终、大修费撑着。不过核电的工作环境有点"牢笼感"——核电站位置偏僻,远离繁华都市。工程公司是承包商性质,干活比业主公司累,而且大概率要经常出差,有的岗位年出差天数100天以上。最大问题是你这1800km的距离过于离谱,核电员工工作强度最小的时候一周也就回一次家,离得远回家成本高,夫妻感情和亲子关系都是现实考验。说白了:高薪是拿青春和生活换的。 再来看offer2:张家口卷烟厂(待遇约13个) 张家口卷烟厂是河北中烟下属三家卷烟厂之一,河北中烟主打的"荷花"系列连续多年位居全国高端卷烟品牌销量前列。烟草系统薪资由基本工资+绩效+年终奖构成,综合年薪普遍显著高于当地平均水平,六险二金齐全,福利拉满。有人问"13个是不是太平平无奇了"——关键张家口是四线城市,生活成本低,这13万的购买力相当于深圳的二十多万。离家500km,开车半天到家,周末回趟家完全可行,幸福感直接上两个档次。中广核的牛油说了句大实话: "哪个核电站好?永远是离家近的那个最好。" 选烟厂同理。 但是,卷烟厂的坑你得清楚: 首先卷烟厂和烟草局不一样,卷烟厂是生产操作类岗位,很多要三班倒。报考条件明确写了要能"胜任夜班工作和长时间站立工作"。一线操作工每天盯着流水线卷烟,工作内容高度重复,有入职的人描述为"食之无味弃之可惜"。有牛油直言"卷烟厂和商业性质的烟草公司不一样,前者很坑很累"。其次你家里人不是不喜欢,而是担心你这211计算机科班出身,进了烟厂干操作工,技能会快速退化,未来如果行业改革,技术壁垒不高,转行比较困难。等你干两年再跳出来,技术栈全忘干净了,回头再去敲代码,发现连应届生都卷不过。 老牛油的灵魂三问: 1. 你是更怕穷,还是更怕想家? 如果特别恋家的人跑1800km之外,第一年哭鼻子的概率高达80%。陆丰那地方偏僻单调,核电基地又远又闷,闲下来除了打游戏没啥娱乐,社交圈也窄。找个对象都费劲——牛油亲测核电站"狼多肉少"。 2. 你的代码能力有多"一般"? 如果真的一般,仪控调试和你专业匹配度不算高,这活儿主要是工程改造设计、现场实施管理、在建机组设计审查等,偏工程向而非纯软开。干两年后跳回互联网赛道,竞争力不一定有明显提升。反倒是烟厂不需要你写代码,进去就是稳定躺平。 3. 烟草局下岸这事儿会不会让你耿耿于怀? 如果烟草局是你第一志愿,烟厂只是plan B,那得想清楚:进去了可能每天看着天花板想"如果当初去了烟草局该多好",这种内耗比钱少还折磨人。如果你能接受"反正都是烟草系统,先进去再说"的心态,那倒无所谓。 一句话总结: 如果年轻想拼想闯做技术积累,中广核虽然累和远,但简历上央企核电的金字招牌确实有含金量,加上到手收入在这两个选项里确实更高,考虑到你个人经济情况和家庭状况,假如家里不需要你常回去照顾,家里有兄弟姐妹帮手分担,那先去核电待三四年,积累经验再跳槽也不失为一步棋。 如果想安稳过日子离家近当"人上人",烟厂低线生活成本加持,加上稳定的编制和福利体系,在张家***得滋润,幸福感吊打陆丰。尤其家里人是那种离不开你的,有烟厂的稳定且离家近,比任何高薪都实在。
点赞 评论 收藏
分享
评论
6
24
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务