Apache Kafka源码分析 – Controller-阿里云开发者社区

开发者社区> 寒凝雪> 正文

Apache Kafka源码分析 – Controller

简介:
+关注继续查看

One of the brokers is elected as the controller for the whole cluster. It will be responsible for:

  1. Leadership change of a partition (each leader can independently update ISR)
  2. New topics; deleted topics
  3. Replica re-assignment

After the controller makes a decision, it publishes the decision permanently in ZK and also sends the new decisions to affected brokers through direct RPC. The published decisions are the source of truth and they are used by clients for request routing and by each broker during startup to recover its state. After the broker is started, it picks up new decisions made by the controller through RPC.

Potential benefits:

  1. Easier debugging since leadership changes are made in a central place.
  2. ZK reads/writes needed for leadership changes can be batched (also easier to exploit ZK multi) and thus reduce end-to-end latency during failover.
  3. Fewer ZK watchers.
  4. More efficient communication of state changes by using direct RPC, instead of via a queue implementation in Zookeeper.

Potential downside:

  1. Need controller failover. 

ControllerContext

其中关键是记录所有topics的partitions和replicas间关系,包括assignment和leadship关系

PartitionStateMachine

加入replica是很复杂的设计,其中重要的一点就是要考虑partitions和replicas中不同情况下的处理
这里先看看Partition的各种state,以及state之间的状态机
NonExistentPartition,不存在或被删掉的p,前一个状态是OfflinePartition
NewPartition, 刚被创建,但还没有完成leader election, 前一个状态是NonExistentPartition
OnlinePartition,具有leader,正常状态,前一个状态是NewPartition/OfflinePartition
OfflinePartition,leader die的时候partition会变为offline,前一个状态是NewPartition/OnlinePartition
这里NewPartition和OfflinePartition都是没有leader的状态,为何要区别开来?见下


核心函数handleStateChanges,对于每个topicAndPartition调用状态机函数handleStateChange,然后把结果告诉每个brokers

状态机函数的具体逻辑, 其他的状态变化都比较简单
唯有,在变成OnlinePartition的时候,需要区分new或offline两种状况,下面具体看下


对于初始化leader只需要取出liveAssignedReplicas的head
而对于offline,需要优先ISR中的,然后才是AR中的 (初始时,AR应该=ISR,但因为会通过shrink isr来调整isr列表,所以AR应该是>=ISR的)

NewPartition->OnlinePartition

OfflinePartition,OnlinePartition->OnlinePartition 
OfflinePartitionLeaderSelector

ReplicaStateMachine

对应于Replica也有比较复杂的状态和相应的状态机
NewReplica,新创建的,只能接受become follower请求,前一个状态为NonExistentReplica
OnlineReplica,正常状态,可以接受become leader和become follower,前一个状态为NewReplica, OnlineReplica or OfflineReplica
OfflineReplica ,replica die,一般由于存储该replica的broker down,前一个状态为NewReplica, OnlineReplica
Replica的删除有3种状态,offline的replica就可以被delete
ReplicaDeletionStarted,前一个状态是OfflineReplica 
ReplicaDeletionSuccessful,前一个状态是ReplicaDeletionStarted
ReplicaDeletionIneligible, 前一个状态是ReplicaDeletionStarted
NonExistentReplica,被删除成功的replica,前一个状态是ReplicaDeletionSuccessful

和PartitionStateMachine一样,最关键的函数就是状态机函数


KafkaController

KafkaController中定义,
partition和replica的状态机
controllerElector, controller作为master可以简单的select partition的leader,但如果controller dead,需要重新在brokers中选取controller,这就需要基于ZK的elect算法
一系列的partition leader的selector用于在不用情况下select partition的leader

下面具体看下Controller上面实现的接口,包含对broker,controller,partition的系列操作

onBrokerStartup:
1. send UpdateMetadata requests for all partitions to newly started brokers 
2. replicas on the newly started broker –> OnlineReplica
3. partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
4. for partitions with replicas on newly started brokers, call onPartitionReassignment to complete any outstanding partition reassignment

onBrokerFailure:
1. partitions w/o leader –> OfflinePartition 
2. partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
3. each replica on the failed broker –> OfflineReplica


shutdownBroker:
1. each partition whose leader is on shutdown broker -> OnlinePartition (ControlledShutdownPartitionLeaderSelector)
2. each replica on shutdown broker that is follower, send StopReplica request (w/o deletion)
3. each replica on shutdown broker that is follower -> OfflineReplica (to force shutdown replica out of the isr)

 

onControllerFailover:
当通过zk electing出当前的broker为controller后,调用此函数做初始化工作
增加controller epoch
初始化controller context
startup channel manager,replica state machine和partition state machine
当初始化时发生任何问题, 都会放弃成为controller

 

onNewTopicCreation:
1. call onNewPartitionCreation


onNewPartitionCreation:
1. new partitions –> NewPartition 
2. all replicas of new partitions –> NewReplica
3. new partitions –> OnlinePartition
4. all replicas of new partitions –> OnlineReplica

onPartitionReassignment: (OAR: old assigned replicas; RAR: new re-assigned replicas when reassignment completes)
注释写的很详细,参考其中的例子
1. 在ZK中,将AR更新为OAR + RAR,由于在reassignment的过程中所有这些replicas都可以被认为是assign给该partition
2. 给every replica in OAR + RAR发送LeaderAndIsr request,更新ISR为OAR + RAR
3. 对于RAR - OAR,创建新的replicas,并把replica状态设为NewReplica
4. 等待所有RAR中的replicas和leader完成同步
5. 将RAR中的所有replicas设为OnlineReplica 
6. 在内存中,将AR设为RAR,但Zk里面仍然是OAR + RAR
7. 如果RAR中没有leader,elect一个新的leader
8. 将OAR - RAR中的设为OfflineReplica. shrink isr以remove OAR - RAR,并发送LeaderAndIsr通知leader,最后发送StopReplica(delete = false)
9. 将OAR - RAR中的设为NonExistentReplica 
10. 在ZK中,将AR设为RAR,代表reassignment完成


本文章摘自博客园,原文发布日期:2014-02-26 

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
1月28日云栖精选夜读 | 终于等到你!阿里正式向 Apache Flink 贡献 Blink 源码
如同我们去年12月在 Flink Forward China 峰会所约,阿里巴巴内部 Flink 版本 Blink 将于 2019 年 1 月底正式开源。今天,我们终于等到了这一刻。
3865 0
Linux USB Host-Controller的初始化代码框架分析【转】
转自:http://blog.csdn.net/zkami/article/details/2496770 usb_hcd_omap_probe (const struct hc_driver *driver) (dev/ohci/ohci-omap.
788 0
怎么设置阿里云服务器安全组?阿里云安全组规则详细解说
阿里云服务器安全组设置规则分享,阿里云服务器安全组如何放行端口设置教程
6362 0
Apache Flink CDC 批流融合技术原理分析
以 Flink SQL 案例来介绍 Flink CDC 2.0 的使用,并解读 CDC 中的核心设计。
508 0
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
1550 0
Kafka or RabbitMQ:消息中间件选型深入分析
一、前言 消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。
2055 0
+关注
5854
文章
223
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载