Apache Kafka源码分析 – Log Management

简介:

LogManager

LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录)
首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口
并且会创建些后台线程来进行,cleanup,flush,checkpoint生成之类的工作


Log

Log只是对于LogSegments的封装,包含loadSegments,append(到active segment),read(需要定位到相应的segment)

LogSegment

Segment是个逻辑概念,为了防止log文件过大, 将log分成许多的LogSegments
Segment又分为两部分,MessageSet文件和Index文件,分别命名为[base_offset].log和[base_offset].index
base_offset就是该Segment的起始offset,比前一个segment里面的offset都要大

Segment提供对于MessageSet的读写接口 
写,需要间隔的更新index文件,应该为了尽量减小index的size,所以只是当写入数据大于indexIntervalBytes时,才增加一条索引
读,由于user传入的是逻辑offest,需要先转化为物理地址才能从文件中读到数据,如何转化参考下面

同时index文件是可以根据MessageSet文件重新rebuild的

FileMessageSet

Segment中实际存放log message的文件,通过FileChannel可以读写文件

   1: /**
   2:  * An on-disk message set. An optional start and end position can be applied to the message set
   3:  * which will allow slicing a subset of the file.
   4:  * @param file The file name for the underlying log data
   5:  * @param channel the underlying file channel used
   6:  * @param start A lower bound on the absolute position in the file from which the message set begins
   7:  * @param end The upper bound on the absolute position in the file at which the message set ends
   8:  * @param isSlice Should the start and end parameters be used for slicing?
   9:  */
  10: @nonthreadsafe
  11: class FileMessageSet private[kafka](@volatile var file: File,
  12:                                     private[log] val channel: FileChannel,
  13:                                     private[log] val start: Int,
  14:                                     private[log] val end: Int,
  15:                                     isSlice: Boolean) extends MessageSet with Logging {...}

OffsetIndex

Segment的index文件, 这是0.8后加上的,之前message直接使用物理offset标识 
新版本中还是改成了使用逻辑offset,让物理地址对用户透明, 这样就需要一个index来匹配逻辑offset和物理地址 
index考虑到效率,最好放在内存中,但是考虑到size问题, 所以使用MappedByteBuffer(参考,Java RandomAccessFile用法 ) 
注释里面说, 
Index是sparse的,不保证每个message在index都有索引的entry 
Index由entry组成,每个entry为8-byte,逻辑offset4-byte,物理地址4-byte 
并且逻辑offset是基于base offset的相对offset,否则无法保证只使用4-byte

   1: /**
   2:  * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
   3:  * that is it may not hold an entry for all messages in the log.
   4:  * 
   5:  * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries.
   6:  * 
   7:  * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant
   8:  * to locate the offset/location pair for the greatest offset less than or equal to the target offset.
   9:  * 
  10:  * Index files can be opened in two ways: either as an empty, mutable index that allows appends or
  11:  * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an 
  12:  * immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
  13:  * 
  14:  * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
  15:  * 
  16:  * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the 
  17:  * message with that offset. The offset stored is relative to the base offset of the index file. So, for example,
  18:  * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use
  19:  * only 4 bytes for the offset.
  20:  * 
  21:  * The frequency of entries is up to the user of this class.
  22:  * 
  23:  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  24:  * storage format.
  25:  */
  26: class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
  27:   private val lock = new ReentrantLock  //操作index文件需要加锁
  28:   
  29:   /* initialize the memory mapping for this index */
  30:   private var mmap: MappedByteBuffer =  //使用MappedByteBuffer来操作index文件以应对大文件
  31:     {
  32:       val newlyCreated = file.createNewFile()
  33:       val raf = new RandomAccessFile(file, "rw")
  34:       val len = raf.length()
  35:       val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)          
  36:     }
  37:  
  38:   //通过byte偏移从buffer中读出某个entry的内容,offset和physical地址
  39:   /* return the nth offset relative to the base offset */
  40:   private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
  41:   /* return the nth physical position */
  42:   private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
  43:  
  44:   //通过二分查找找到targetOffset或最接近的offset(less than)
  45:   /**
  46:    * Find the largest offset less than or equal to the given targetOffset 
  47:    * and return a pair holding this offset and it's corresponding physical file position.
  48:    * 
  49:    * @param targetOffset The offset to look up.
  50:    * 
  51:    * @return The offset found and the corresponding file position for this offset. 
  52:    * If the target offset is smaller than the least entry in the index (or the index is empty),
  53:    * the pair (baseOffset, 0) is returned.
  54:    */
  55:   def lookup(targetOffset: Long): OffsetPosition = {...}
  56:  
  57: /**
  58:  * Get the nth offset mapping from the index
  59:  * @param n The entry number in the index
  60:  * @return The offset/position pair at that entry
  61:  */
  62: def entry(n: Int): OffsetPosition = {
  63:   maybeLock(lock) {
  64:     if(n >= entries)
  65:       throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))
  66:     val idx = mmap.duplicate
  67:     OffsetPosition(relativeOffset(idx, n), physical(idx, n))
  68:   }
  69: }
  70:  
  71: /**
  72:  * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
  73:  */
  74: def append(offset: Long, position: Int) {
  75:   inLock(lock) {
  76:     require(!isFull, "Attempt to append to a full index (size = " + size + ").")
  77:     if (size.get == 0 || offset > lastOffset) {
  78:       debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
  79:       this.mmap.putInt((offset - baseOffset).toInt)
  80:       this.mmap.putInt(position)
  81:       this.size.incrementAndGet()
  82:       this.lastOffset = offset
  83:       require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
  84:     } else {
  85:       throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
  86:         .format(offset, entries, lastOffset, file.getAbsolutePath))
  87:     }
  88:   }
  89: }

具体看看如何从逻辑offset转化为物理地址的?

0.8中增加了逻辑offset,那么就需要做逻辑offset和物理地址间的转化
简单的方法,直接用hashmap,cache所有offset,问题就是这样空间耗费比较大
所以kafka的方式,是分段索引,用offset通过二分查找中index中找出段的起始地址,然后再去file里面遍历找出精确的地址, 时间换空间的设计

1. LogSegment.translateOffset
首先是从index文件中找到近似的物理地址
前面说了,index中从效率考虑并不会为每个offset建立索引entry,只会分段建立offset索引, 所以从index中直接可以找到精确物理地址的概率不大,但是可以找到最接近的那个物理地址
如果你觉得index的粒度比较粗,可以直接给出开始查找的startingFilePosition
所以精确的物理地址需要到MessageSet文件里面去继续找

2. FileMessageSet.searchFor
在messageSet中,message的构成是,overhead(MessageSize+Offset)和message
而searchFor的逻辑是从startingPosition开始, 逐条遍历各个message,并从overhead中取出offset进行比较,直到找到target offset为止


本文章摘自博客园,原文发布日期:2014-02-18

目录
相关文章
|
20天前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
46 7
|
20天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
64 5
|
18天前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
19天前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
39 0
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
46 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
271 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
135 0
|
3月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

推荐镜像

更多