Apache Kafka源码分析 – Controller

简介:

One of the brokers is elected as the controller for the whole cluster. It will be responsible for:

  1. Leadership change of a partition (each leader can independently update ISR)
  2. New topics; deleted topics
  3. Replica re-assignment

After the controller makes a decision, it publishes the decision permanently in ZK and also sends the new decisions to affected brokers through direct RPC. The published decisions are the source of truth and they are used by clients for request routing and by each broker during startup to recover its state. After the broker is started, it picks up new decisions made by the controller through RPC.

Potential benefits:

  1. Easier debugging since leadership changes are made in a central place.
  2. ZK reads/writes needed for leadership changes can be batched (also easier to exploit ZK multi) and thus reduce end-to-end latency during failover.
  3. Fewer ZK watchers.
  4. More efficient communication of state changes by using direct RPC, instead of via a queue implementation in Zookeeper.

Potential downside:

  1. Need controller failover. 

ControllerContext

其中关键是记录所有topics的partitions和replicas间关系,包括assignment和leadship关系

PartitionStateMachine

加入replica是很复杂的设计,其中重要的一点就是要考虑partitions和replicas中不同情况下的处理
这里先看看Partition的各种state,以及state之间的状态机
NonExistentPartition,不存在或被删掉的p,前一个状态是OfflinePartition
NewPartition, 刚被创建,但还没有完成leader election, 前一个状态是NonExistentPartition
OnlinePartition,具有leader,正常状态,前一个状态是NewPartition/OfflinePartition
OfflinePartition,leader die的时候partition会变为offline,前一个状态是NewPartition/OnlinePartition
这里NewPartition和OfflinePartition都是没有leader的状态,为何要区别开来?见下


核心函数handleStateChanges,对于每个topicAndPartition调用状态机函数handleStateChange,然后把结果告诉每个brokers

状态机函数的具体逻辑, 其他的状态变化都比较简单
唯有,在变成OnlinePartition的时候,需要区分new或offline两种状况,下面具体看下


对于初始化leader只需要取出liveAssignedReplicas的head
而对于offline,需要优先ISR中的,然后才是AR中的 (初始时,AR应该=ISR,但因为会通过shrink isr来调整isr列表,所以AR应该是>=ISR的)

NewPartition->OnlinePartition

OfflinePartition,OnlinePartition->OnlinePartition 
OfflinePartitionLeaderSelector

ReplicaStateMachine

对应于Replica也有比较复杂的状态和相应的状态机
NewReplica,新创建的,只能接受become follower请求,前一个状态为NonExistentReplica
OnlineReplica,正常状态,可以接受become leader和become follower,前一个状态为NewReplica, OnlineReplica or OfflineReplica
OfflineReplica ,replica die,一般由于存储该replica的broker down,前一个状态为NewReplica, OnlineReplica
Replica的删除有3种状态,offline的replica就可以被delete
ReplicaDeletionStarted,前一个状态是OfflineReplica 
ReplicaDeletionSuccessful,前一个状态是ReplicaDeletionStarted
ReplicaDeletionIneligible, 前一个状态是ReplicaDeletionStarted
NonExistentReplica,被删除成功的replica,前一个状态是ReplicaDeletionSuccessful

和PartitionStateMachine一样,最关键的函数就是状态机函数


KafkaController

KafkaController中定义,
partition和replica的状态机
controllerElector, controller作为master可以简单的select partition的leader,但如果controller dead,需要重新在brokers中选取controller,这就需要基于ZK的elect算法
一系列的partition leader的selector用于在不用情况下select partition的leader

下面具体看下Controller上面实现的接口,包含对broker,controller,partition的系列操作

onBrokerStartup:
1. send UpdateMetadata requests for all partitions to newly started brokers 
2. replicas on the newly started broker –> OnlineReplica
3. partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
4. for partitions with replicas on newly started brokers, call onPartitionReassignment to complete any outstanding partition reassignment

onBrokerFailure:
1. partitions w/o leader –> OfflinePartition 
2. partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
3. each replica on the failed broker –> OfflineReplica


shutdownBroker:
1. each partition whose leader is on shutdown broker -> OnlinePartition (ControlledShutdownPartitionLeaderSelector)
2. each replica on shutdown broker that is follower, send StopReplica request (w/o deletion)
3. each replica on shutdown broker that is follower -> OfflineReplica (to force shutdown replica out of the isr)

 

onControllerFailover:
当通过zk electing出当前的broker为controller后,调用此函数做初始化工作
增加controller epoch
初始化controller context
startup channel manager,replica state machine和partition state machine
当初始化时发生任何问题, 都会放弃成为controller

 

onNewTopicCreation:
1. call onNewPartitionCreation


onNewPartitionCreation:
1. new partitions –> NewPartition 
2. all replicas of new partitions –> NewReplica
3. new partitions –> OnlinePartition
4. all replicas of new partitions –> OnlineReplica

onPartitionReassignment: (OAR: old assigned replicas; RAR: new re-assigned replicas when reassignment completes)
注释写的很详细,参考其中的例子
1. 在ZK中,将AR更新为OAR + RAR,由于在reassignment的过程中所有这些replicas都可以被认为是assign给该partition
2. 给every replica in OAR + RAR发送LeaderAndIsr request,更新ISR为OAR + RAR
3. 对于RAR - OAR,创建新的replicas,并把replica状态设为NewReplica
4. 等待所有RAR中的replicas和leader完成同步
5. 将RAR中的所有replicas设为OnlineReplica 
6. 在内存中,将AR设为RAR,但Zk里面仍然是OAR + RAR
7. 如果RAR中没有leader,elect一个新的leader
8. 将OAR - RAR中的设为OfflineReplica. shrink isr以remove OAR - RAR,并发送LeaderAndIsr通知leader,最后发送StopReplica(delete = false)
9. 将OAR - RAR中的设为NonExistentReplica 
10. 在ZK中,将AR设为RAR,代表reassignment完成


本文章摘自博客园,原文发布日期:2014-02-26 

目录
相关文章
|
1月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
3月前
|
消息中间件 Kafka Linux
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
Apache Kafka-初体验Kafka(03)-Centos7下搭建kafka集群
64 0
|
8天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
3月前
|
消息中间件 Java Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
31 0
|
2月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
432 2
2024年了,如何更好的搭建Kafka集群?
|
3月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
43 0
|
6月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
242 0
|
3月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
151 0
|
4月前
|
消息中间件 存储 算法
Kafka Raft集群搭建
Kafka Raft集群搭建
73 0
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
29 1

推荐镜像

更多