开发者社区> 青衫无名> 正文

《KAFKA官方文档》设计与实现(二)

简介: 5.4 消息格式 /** * 1. 消息的4字节CRC32 * 2. 一个字节的 identifier ,用以格式的变化,变化的值为0 或者1 * 3. 一个字节的 identifier属性,允许消息的注释与版本无关 * 位 0 ~ 2 : 压缩编解码 * 0 : 无压缩 * 1 :
+关注继续查看

5.4 消息格式

/**
* 1. 消息的4字节CRC32
* 2. 一个字节的 identifier ,用以格式的变化,变化的值为0 或者1
* 3. 一个字节的 identifier属性,允许消息的注释与版本无关
* 位 0 ~ 2 : 压缩编解码
* 0 : 无压缩
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : 时间戳类型
* 0 : 创建时间
* 1 : 日志追加时间
* bit 4 ~ 7 : 保留位
* 4. (可选的) 8字节时间戳只有当“magic”标识符大于0
* 5. 4字节密钥长度,包含长度k
* 6. K 字节的 key
* 7. 4字节有效负载长度,含长度v
* 8. V字节的有效负载
*/

5.5 日志

topic名字为”my_topic”的日志有两个分区,并且包含两个目录(也就是my_topic_0my_topic_1),目录里面包含的是该topic的消息数据文件。日志文件的格式是“日志条目”序列;每个日志条目是一个4字节整数n存储的消息长度,其次是N消息字节.每个消息唯一标识是一个64位整数的偏移量,该偏移量由发送到该主题的该分区的所有消息流中的消息的开始的字节位置给出。每个消息的磁盘格式如下。每个日志文件以其包含的第一条消息的偏移量命名。所以第一个创建的文件将是00000000000.kafka,每个附加文件将有一个大致S字节整数的名字,S是指之前在配置中设置的日志文件最大值

确切的二进制消息格式是版本化的并且是以标准的接口进行维护,因此,当进行容错时,消息集可以在生产者,kafka节点,以及客户端之间进行相互的任意转换,而不用复制和调节。改格式如下:

磁盘上消息的格式

offset : 8 bytes
message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
crc : 4 bytes
magic value : 1 byte
attributes : 1 byte
timestamp : 8 bytes (Only exists when magic value is greater than zero)
key length : 4 bytes
key : K bytes
value length : 4 bytes
value : V bytes

使用消息偏移作为消息ID是极其少见的。我们最初的想法是使用一个由生产者自带的GUID自增长实现,并且保存从GUID到每一个节点上偏移量的映射。但由于每个消费者必须持有每个server的ID,因此GUID的全局唯一性将无法提供。更重要的是持有随机id至分区偏移量
的复杂度需要很重的索引结构,而且必须要与磁盘同步,这在本质上就需要一种完全的持久化的随机存取数据结构。因此,为了简化查找结构,我们决定使用一个简单的针对每个分区的原子计数器,它可以用分区ID和节点ID唯一标识消息;这使得查找结构更简单,尽管多每个消费者的多个请求仍然是可能的。然而,一旦我们解决了计数器的问题,跳转至直接使用偏移量似乎变得顺理成章了,毕竟对于任意一个分区而言,计数器是单调唯一增长的。由于偏移量的实现对于消费者是不可见的,因此最终可以采取一种更为高效的方式具体实现(此处句子读得不太明白)。

Kafka Log Implementation

写(Write)

日志允许串行追加,通常是追加到最后一个文件,当达到配置的最大值时,该文件会在另一个新的文件上继续追加。日志文件有两个配置参数:m是操作系统将文件刷新到磁盘之前,可写入的消息的数目,s是指强制多少秒执行一次刷新操作。这给出了一个持久性保障,在系统崩溃时,至多丢失M条消息或S秒的数据。

读(Read)

消息的读取是依据消息在分区中的64位逻辑偏移量以及S字节的最大读取值完成。这个将返回在S字节缓冲区中的消息迭代器。S值的配置应比任何消息的长度要大,但当消息的长度异常地大时,消息的读取将被重试多次,每一次重试都会扩大缓冲区一倍,直到消息被成功读取。消息的最大值缓冲区的最大值均可设置,如此服务器便可以拒绝处理比它们要大的消息,并提供客户端上要读取完整消息所需的最大值的绑定。已分区消息为结尾去读取缓冲区是有可能的,大小的分割将会被很容易地检测到。

从偏移量读取消息的实际过程需要首先定位存储数据的日志段文件,从全局偏移量计算文件特定的偏移量,然后从该文件偏移量读取。这是一个简单的对每个文件保持在内存范围内变化的二分查找

该日志提供了获取最近写入消息的能力,允许客户端“实时”订阅。这在消费者未能消费指定天数消息的场景下尤为有用。在这种情况下,消费者试图去消费一个由OutOfRangeException引起的不存在的偏移量,要么采取重置自身的方式,要么消费失败并作为消费失败的案例(有些许问题)

如下是发送给消费者的结果的格式

MessageSetSend (fetch result)

total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes
MultiMessageSetSend (multiFetch result)

total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n

删除(Delete)

数据删除是一次删除一个日志段。日志管理器允许可插入的删除策略来选择哪些文件适合删除。当前的策略是删除任何超过 N天前修改时间的日志,虽然保留最后N GB的政策也可以是有用的。为了避免读取时锁定,而仍然允许修改段列表的删除,我们使用一个 copy-on-write样式分段列表实现,它提供一致的视图,允许以二分查找的方式读取日志段的不变的静态快照视图,且删除操作也同步进行。

保证(Guarantees)

kafka日志提供了一个可配置的参数M,该参数可以控制在消息被强制写入到磁盘前消息的最大数量。在kafka服务启动时,一个日志恢复线程会伴随着启动,它迭代所有最新分段日志的消息,并且校验所有消息项的有效性。如果消息的大小和偏移量的总和不超过该文件的长度并且消息有效负载的CRC32匹配消息存储有CRC信息,那么该消息项就是有效的。在检测到崩溃事件时,日志将被截断为最后有效的偏移量。

请注意,两种类型的崩溃必须处理:由于崩溃导致的写块丢失损坏,以及无意义区块被追加到文件的损坏。这样做的原因是,在一般的操作系统不保证在文件的节点之间的和实际的数据块的写入顺序,因此除了失去已写入的数据,假如节点正在更新值,但在数据写入之前发生崩溃了,那么该文件获取到的都是无意义的数据。CRC就是检测这样的边缘案例,并防止它损坏日志(当然,未写入的消息会丢失)

转载自 并发编程网 - ifeve.com

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
13801 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
18392 0
阿里云服务器ECS登录用户名是什么?系统不同默认账号也不同
阿里云服务器Windows系统默认用户名administrator,Linux镜像服务器用户名root
13863 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
24949 0
《KAFKA官方文档》第三章:快速入门(二)
第八步:使用Kafka流(Kafka Streams)处理数据 Kafka流是一个针对存储于Kafka brokers上的数据进行实时流处理和分析的客户端类库。快速入门中的示例将展示如何使用这个类库实现一个数据流处理应用。
1366 0
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
20107 0
《KAFKA官方文档》第三章:快速入门(一)
快速入门 翻译者:kimmking@163.com 原文:kafka.apache.org/quickstart 本教程假设读者完全从零开始,电脑上没有已经存在的Kafka和Zookeeper环境。
1869 0
+关注
3598
文章
840
问答
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载