kafka的工作流程是怎么样的?
1.首先一个kafka集群有很多个kafka的服务器,每个kafka服务器就是一个broker,每一类消息有一个topic,生产者将一个消息发送给broker。
2.每个topic会有一个或者多个分区,broker根据分发机制将这个消息分给这个topic下的某个分区的leader,
分发机制:
- 1.发的消息指定了分区就发到特定分区下
- 2.指定了key,就根据murmur2 哈希算法对key计算得到一个哈希值,将哈希值与分区数量取余,得到分区。
- 3.没有指定分区,也没有指定key,那么就根据一个自增计数与分区数取余得到分区,这样可以让消息分发在每个分区更加均匀。
3.每个分区就是一个目录,目录名是topic+分区编号,在收到消息后会将消息写入到日志文件中,如果一个分区的消息都有存放在一个日志文件中,那么文件会比较大,查询时会比较慢,而且也不便于之后删除旧的消息。所以每个分区对应多个大小相等的segment文件,每个segment的名称是上一个segment最后一条消息的offset,一个segment有两个文件,一个是.index文件,记录了消息的offset及这条消息数据在log文件中的偏移量。一个是.log文件,实际存储每个消息数据,每条消息数据大小不一,每条消息数据包含offset,消息体大小,消息体等等内容。查的时候根据offset先去index文件找到偏移量,然后去log文件中读。
(具体的segment切分有很多个触发条件:
当log文件>log.segment.bytes时切分,默认是1G。
或者是segment文件中最早的消息距离现在的时间>log.roll.ms配置的时间,默认是7天。
或者是索引文件index>log.index.size.max.bytes的大小,默认是10M。)
4.分区leader将消息存储到日志文件中后还不能算是写成功,会把消息同步给所有follower,当follower同步好消息之后就会给leader发ack,leader收到所有follower返回的ack之后,这条才算是写成功,然后才会给生产者返回写成功。(依据ACK配置来决定多少follower同步成功才算生产者发送消息成功)
5.消费者读数据时就去分区的leader中去读,一个消费者可以消费多个分区,但是一个分区只能一个消费者来消费,默认消费者取完数据就会自动提交,一般会关闭自动提交,消费者消费成功后,进行手动提交,分区的offset才会向后移动。(默认是会自动提交,一般会关闭自动提交)
注意事项:
1.replication.factor>=2,也就是一个分区至少会有两个副本。
2.min.insync.replicas默认是1,leader至少要有一个follow跟自己保持联系没有掉线。(这个配置只有在ack为all或者-1时有用,也就是ack为all也只是要求生产者发送的消息,被leader以及ISR集合里面的从节点接收到,就算所有节点都接收到了。)
3.一般设置了ack=all就不会丢数据。因为会保证所有的follower都收到消息,才算broker接收成功,默认ack=1。
4.retries=,生产者写入消息失败后的重试次数。
5.每个partition有一个offset,
6.生产者ACK配置:
1(默认) 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。
0 生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-1 也就是all,producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。
怎么防止Kafka 丢数据?
这块比较常见的一个场景,就是 Kafka
某个 broker
宕机,然后重新选举 partition
的 leader
。大家想想,要是此时其他的 follower
刚好还有些数据没有同步,结果此时 leader
挂了,然后选举某个 follower
成 leader
之后,不就少了一些数据?这就丢了一些数据啊。
此时一般是要求起码设置如下 4 个参数:
- 给
topic
设置replication.factor
参数:这个值必须大于 1,要求每个partition
必须有 至少 2 个副本。 - 在
Kafka
服务端设置min.insync.replicas
参数:这个值必须大于 1,这个是 要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保leader
挂了还有一个follower
吧。 - 在
producer
端设置acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。 - 在
producer
端设置retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
这样配置之后,至少在Kafka broker
端就可以保证在leader
所在 broker
发生故障,进行leader
切换时,数据不会丢失。
生产者会不会弄丢数据?
如果按照上述的思路设置了acks=all
,一定不会丢,要求是,你的 leader
接收到消息,所有的follower
都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者可以自动不断的重试,重试无限次。
怎么实现 Exactly-Once?
生产端幂等性发送
为了实现Producer的幂等语义,Kafka引入了Producer ID
(即PID
)和Sequence Number
。每个新的Producer在初始化的时候会被分配一个唯一的PID,该PID对用户完全透明而不会暴露给用户。
对于每个PID,该Producer发送数据的每个<Topic, Partition>
都对应一个从0开始单调递增的Sequence Number
。
类似地,Broker端也会为每个<PID, Topic, Partition>
维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,否则将其丢弃:
- 如果消息序号比Broker维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出
InvalidSequenceNumber
- 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出
DuplicateSequenceNumber
上述设计解决了0.11.0.0之前版本中的两个问题:
- Broker保存消息后,发送ACK前宕机,Producer认为消息未发送成功并重试,造成数据重复
- 前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序。
http://www.jasongj.com/kafka/transaction/
消费端幂等性
只能自己从业务层面保证重复消费的幂等性,例如引入版本号机制。
事务性保证
上述幂等设计只能保证单个Producer对于同一个<Topic, Partition>
的Exactly Once
语义。
另外,它并不能保证写操作的原子性——即多个写操作,要么全部被Commit要么全部不被Commit。
更不能保证多个读写操作的的原子性。尤其对于Kafka Stream应用而言,典型的操作即是从某个Topic消费数据,经过一系列转换后写回另一个Topic,保证从源Topic的读取与向目标Topic的写入的原子性有助于从故障中恢复。
事务保证可使得应用程序将生产数据和消费数据当作一个原子单元来处理,要么全部成功,要么全部失败,即使该生产或消费跨多个<Topic, Partition>
。
另外,有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。
为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)唯一的ID,也即Transaction ID
。Transactin ID
与PID
可能一一对应。区别在于Transaction ID
由用户提供,而PID
是内部的实现对用户透明。
另外,为了保证新的Producer启动后,旧的具有相同Transaction ID
的Producer即失效,每次Producer通过Transaction ID
拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producer的epoch比新Producer的epoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。
有了Transaction ID
后,Kafka可保证:
- 跨Session的数据幂等发送。当具有相同
Transaction ID
的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID
的Producer将不再工作。 - 跨Session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作。
需要注意的是,上述的事务保证是从Producer的角度去考虑的。从Consumer的角度来看,该保证会相对弱一些。尤其是不能保证所有被某事务Commit过的所有消息都被一起消费,因为:
- 对于压缩的Topic而言,同一事务的某些消息可能被其它版本覆盖
- 事务包含的消息可能分布在多个Segment中(即使在同一个Partition内),当老的Segment被删除时,该事务的部分数据可能会丢失
- Consumer在一个事务内可能通过seek方法访问任意Offset的消息,从而可能丢失部分消息
- Consumer可能并不需要消费某一事务内的所有Partition,因此它将永远不会读取组成该事务的所有消息