什么是Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。
Kafka是一种高吞吐量的分布式发布订阅消息系统,作为消息中间件来说都起到了系统间解耦、异步、削峰等作用,同时又提供了Kafka streaming插件包在应用端实现实时在线流处理,它可以收集并处理用户在网站中的所有动作流数据以及物联网设备的采样信息。
特性
高吞吐
kafka高吞吐表现以下几点:
Topic分区
- 同一个Topic的多个分区同时写入消息/多个消费者同时间消费不同分区消息
- 不保证全局FIFO
- 集群环境中的数据分布均匀
顺序IO
- 顺序IO读写操作的访问地址连续,减少寻址时间
- 随机IO读写操作时间连续,访问地址不连续
内存映射(memory Map File)
- 当CPU读取数据时,是由内存管理单元(MMU)管理的。
- 内核空间PageCache->磁盘(操作系统决定何时刷新)断电丢数据
零拷贝
直接将数据从内核空间传递输出
- 零拷贝:磁盘->内核内存->socket buffer(内核态)->网络
- 常规copy:磁盘->内核内存->应用->socket buffer->网络
- 常规io:用户空间系统调用->CPU IO读取->硬盘->写入磁盘缓冲->CPU将缓冲拷贝内核缓冲->返回用户空间。全程阻塞
- DMA:用户空间系统调用->CPU IO读取->DMA IO读取->硬盘->写入磁盘缓冲->DMA将缓冲拷贝内核缓冲->通知CPU->返回用户空间。CPU不阻塞
Topic分区
- 可以指定一个Topic中有多少分区
- 可以指定生产/消费某个分区消息(默认轮询)
生产者
- 可以指定分区生产消息
- 拦截器:生产者生产消息时可做全局拦截,对消息进行统一处理
- 幂等:生产消息生成唯一标识,分区中存在唯一标识则拒绝
- 事务:事务保证幂等性,保证多个分区的数据原子性
消费者
- 消费者用一个消费组标记自己,同一个topic相同消费组轮询读取,不同消费组同时读取,消费者多余分区则会空闲
- 消费者定期(可配置)提交偏移量,也可手动提交偏移量
- latest(读取最新偏移量) earliest(如果系统没有消费者的偏移量,读取该分区最早的偏移量)none(异常)
- 可设置偏移量开始读取
集群
集群角色
角色 | 作用 |
---|---|
Leader | 负责分区的读写 |
Follower | 负责同步数据 |
副本备份(数据同步)
- LEO(last end offset):每个副本都有一个 log end offset,标识分区最后一条消息的下一个位置。
- HW(高水位线):所有在HW之前的数据都已经备份成功
0.10版本
数据丢失: brokerA(副本)HW为0,brokerB(Leader)HW为1,brokerA重启未同步HW至1,重启后重新向brokerB同步,此时brokerB也断电重启,选举Leader为brokerA后数据回到HW为0的数据造成丢失
数据不一致:brokerA(副本)HW为0,brokerB(Leader)HW为1,同时宕机,重启后brokerA为Leader,写入数据后HW为1,此时brokerB启动成功后同步brokerA HW,HW一致不同步数据。
两者问题都是由于Leader宕机导致
0.11版本解决以上两个问题
引入Leader Epoch,每个分区有Leader Epoch Seq。所谓leader epoch实际上是一对值:(epoch,offset)。epoch表示leader的版本号,从0开始,当leader变更过1次时epoch就会+1,而offset则对应于该epoch版本的leader写入第一条消息的位移。因此假设有两对值:
(0, 0)
(1, 120)
则表示第一个leader从位移0开始写入消息;共写了120条[0, 119];而第二个leader版本号是1,从位移120处开始写入消息。
leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件中。
当leader写底层log时它会尝试更新整个缓存——如果这个leader首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为leader时会查询这部分缓存,获取出对应leader版本的位移,这就不会发生数据不一致和丢失的情况。
基本命令
创建Topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topicbg --partitions 3 --replicatioon-factor 1
启动一个消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicbg --group groupbg
启动一个生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicbg
插件
kafka管理界面:kafka-eagle