5.4 消息格式
/**
* 1. 消息的4字节CRC32
* 2. 一个字节的 identifier ,用以格式的变化,变化的值为0 或者1
* 3. 一个字节的 identifier属性,允许消息的注释与版本无关
* 位 0 ~ 2 : 压缩编解码
* 0 : 无压缩
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : 时间戳类型
* 0 : 创建时间
* 1 : 日志追加时间
* bit 4 ~ 7 : 保留位
* 4. (可选的) 8字节时间戳只有当“magic”标识符大于0
* 5. 4字节密钥长度,包含长度k
* 6. K 字节的 key
* 7. 4字节有效负载长度,含长度v
* 8. V字节的有效负载
*/
5.5 日志
topic名字为”my_topic”的日志有两个分区,并且包含两个目录(也就是my_topic_0
和my_topic_1
),目录里面包含的是该topic的消息数据文件。日志文件的格式是“日志条目”序列;每个日志条目是一个4字节整数n存储的消息长度,其次是N消息字节.每个消息唯一标识是一个64位整数的偏移量,该偏移量由发送到该主题的该分区的所有消息流中的消息的开始的字节位置给出。每个消息的磁盘格式如下。每个日志文件以其包含的第一条消息的偏移量命名。所以第一个创建的文件将是00000000000.kafka,每个附加文件将有一个大致S字节整数的名字,S是指之前在配置中设置的日志文件最大值
确切的二进制消息格式是版本化的并且是以标准的接口进行维护,因此,当进行容错时,消息集可以在生产者,kafka节点,以及客户端之间进行相互的任意转换,而不用复制和调节。改格式如下:
磁盘上消息的格式
offset : 8 bytes
message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
crc : 4 bytes
magic value : 1 byte
attributes : 1 byte
timestamp : 8 bytes (Only exists when magic value is greater than zero)
key length : 4 bytes
key : K bytes
value length : 4 bytes
value : V bytes
使用消息偏移作为消息ID是极其少见的。我们最初的想法是使用一个由生产者自带的GUID自增长实现,并且保存从GUID到每一个节点上偏移量的映射。但由于每个消费者必须持有每个server的ID,因此GUID的全局唯一性将无法提供。更重要的是持有随机id至分区偏移量
的复杂度需要很重的索引结构,而且必须要与磁盘同步,这在本质上就需要一种完全的持久化的随机存取数据结构。因此,为了简化查找结构,我们决定使用一个简单的针对每个分区的原子计数器,它可以用分区ID和节点ID唯一标识消息;这使得查找结构更简单,尽管多每个消费者的多个请求仍然是可能的。然而,一旦我们解决了计数器的问题,跳转至直接使用偏移量似乎变得顺理成章了,毕竟对于任意一个分区而言,计数器是单调唯一增长的。由于偏移量的实现对于消费者是不可见的,因此最终可以采取一种更为高效的方式具体实现(此处句子读得不太明白)。
写(Write)
日志允许串行追加,通常是追加到最后一个文件,当达到配置的最大值时,该文件会在另一个新的文件上继续追加。日志文件有两个配置参数:m是操作系统将文件刷新到磁盘之前,可写入的消息的数目,s是指强制多少秒执行一次刷新操作。这给出了一个持久性保障,在系统崩溃时,至多丢失M条消息或S秒的数据。
读(Read)
消息的读取是依据消息在分区中的64位逻辑偏移量以及S字节的最大读取值完成。这个将返回在S字节缓冲区中的消息迭代器。S值的配置应比任何消息的长度要大,但当消息的长度异常地大时,消息的读取将被重试多次,每一次重试都会扩大缓冲区一倍,直到消息被成功读取。消息的最大值缓冲区的最大值均可设置,如此服务器便可以拒绝处理比它们要大的消息,并提供客户端上要读取完整消息所需的最大值的绑定。已分区消息为结尾去读取缓冲区是有可能的,大小的分割将会被很容易地检测到。
从偏移量读取消息的实际过程需要首先定位存储数据的日志段文件,从全局偏移量计算文件特定的偏移量,然后从该文件偏移量读取。这是一个简单的对每个文件保持在内存范围内变化的二分查找
该日志提供了获取最近写入消息的能力,允许客户端“实时”订阅。这在消费者未能消费指定天数消息的场景下尤为有用。在这种情况下,消费者试图去消费一个由OutOfRangeException引起的不存在的偏移量,要么采取重置自身的方式,要么消费失败并作为消费失败的案例(有些许问题)
如下是发送给消费者的结果的格式
MessageSetSend (fetch result)
total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes
MultiMessageSetSend (multiFetch result)
total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n
删除(Delete)
数据删除是一次删除一个日志段。日志管理器允许可插入的删除策略来选择哪些文件适合删除。当前的策略是删除任何超过 N天前修改时间的日志,虽然保留最后N GB的政策也可以是有用的。为了避免读取时锁定,而仍然允许修改段列表的删除,我们使用一个 copy-on-write样式分段列表实现,它提供一致的视图,允许以二分查找的方式读取日志段的不变的静态快照视图,且删除操作也同步进行。
保证(Guarantees)
kafka日志提供了一个可配置的参数M,该参数可以控制在消息被强制写入到磁盘前消息的最大数量。在kafka服务启动时,一个日志恢复线程会伴随着启动,它迭代所有最新分段日志的消息,并且校验所有消息项的有效性。如果消息的大小和偏移量的总和不超过该文件的长度并且消息有效负载的CRC32匹配消息存储有CRC信息,那么该消息项就是有效的。在检测到崩溃事件时,日志将被截断为最后有效的偏移量。
请注意,两种类型的崩溃必须处理:由于崩溃导致的写块丢失损坏,以及无意义区块被追加到文件的损坏。这样做的原因是,在一般的操作系统不保证在文件的节点之间的和实际的数据块的写入顺序,因此除了失去已写入的数据,假如节点正在更新值,但在数据写入之前发生崩溃了,那么该文件获取到的都是无意义的数据。CRC就是检测这样的边缘案例,并防止它损坏日志(当然,未写入的消息会丢失)