Apache Kafka源码分析 – Controller

简介:

One of the brokers is elected as the controller for the whole cluster. It will be responsible for:

  1. Leadership change of a partition (each leader can independently update ISR)
  2. New topics; deleted topics
  3. Replica re-assignment

After the controller makes a decision, it publishes the decision permanently in ZK and also sends the new decisions to affected brokers through direct RPC. The published decisions are the source of truth and they are used by clients for request routing and by each broker during startup to recover its state. After the broker is started, it picks up new decisions made by the controller through RPC.

Potential benefits:

  1. Easier debugging since leadership changes are made in a central place.
  2. ZK reads/writes needed for leadership changes can be batched (also easier to exploit ZK multi) and thus reduce end-to-end latency during failover.
  3. Fewer ZK watchers.
  4. More efficient communication of state changes by using direct RPC, instead of via a queue implementation in Zookeeper.

Potential downside:

  1. Need controller failover. 

ControllerContext

其中关键是记录所有topics的partitions和replicas间关系,包括assignment和leadship关系

PartitionStateMachine

加入replica是很复杂的设计,其中重要的一点就是要考虑partitions和replicas中不同情况下的处理
这里先看看Partition的各种state,以及state之间的状态机
NonExistentPartition,不存在或被删掉的p,前一个状态是OfflinePartition
NewPartition, 刚被创建,但还没有完成leader election, 前一个状态是NonExistentPartition
OnlinePartition,具有leader,正常状态,前一个状态是NewPartition/OfflinePartition
OfflinePartition,leader die的时候partition会变为offline,前一个状态是NewPartition/OnlinePartition
这里NewPartition和OfflinePartition都是没有leader的状态,为何要区别开来?见下


核心函数handleStateChanges,对于每个topicAndPartition调用状态机函数handleStateChange,然后把结果告诉每个brokers

状态机函数的具体逻辑, 其他的状态变化都比较简单
唯有,在变成OnlinePartition的时候,需要区分new或offline两种状况,下面具体看下


对于初始化leader只需要取出liveAssignedReplicas的head
而对于offline,需要优先ISR中的,然后才是AR中的 (初始时,AR应该=ISR,但因为会通过shrink isr来调整isr列表,所以AR应该是>=ISR的)

NewPartition->OnlinePartition

OfflinePartition,OnlinePartition->OnlinePartition 
OfflinePartitionLeaderSelector

ReplicaStateMachine

对应于Replica也有比较复杂的状态和相应的状态机
NewReplica,新创建的,只能接受become follower请求,前一个状态为NonExistentReplica
OnlineReplica,正常状态,可以接受become leader和become follower,前一个状态为NewReplica, OnlineReplica or OfflineReplica
OfflineReplica ,replica die,一般由于存储该replica的broker down,前一个状态为NewReplica, OnlineReplica
Replica的删除有3种状态,offline的replica就可以被delete
ReplicaDeletionStarted,前一个状态是OfflineReplica 
ReplicaDeletionSuccessful,前一个状态是ReplicaDeletionStarted
ReplicaDeletionIneligible, 前一个状态是ReplicaDeletionStarted
NonExistentReplica,被删除成功的replica,前一个状态是ReplicaDeletionSuccessful

和PartitionStateMachine一样,最关键的函数就是状态机函数


KafkaController

KafkaController中定义,
partition和replica的状态机
controllerElector, controller作为master可以简单的select partition的leader,但如果controller dead,需要重新在brokers中选取controller,这就需要基于ZK的elect算法
一系列的partition leader的selector用于在不用情况下select partition的leader

下面具体看下Controller上面实现的接口,包含对broker,controller,partition的系列操作

onBrokerStartup:
1. send UpdateMetadata requests for all partitions to newly started brokers 
2. replicas on the newly started broker –> OnlineReplica
3. partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
4. for partitions with replicas on newly started brokers, call onPartitionReassignment to complete any outstanding partition reassignment

onBrokerFailure:
1. partitions w/o leader –> OfflinePartition 
2. partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
3. each replica on the failed broker –> OfflineReplica


shutdownBroker:
1. each partition whose leader is on shutdown broker -> OnlinePartition (ControlledShutdownPartitionLeaderSelector)
2. each replica on shutdown broker that is follower, send StopReplica request (w/o deletion)
3. each replica on shutdown broker that is follower -> OfflineReplica (to force shutdown replica out of the isr)

 

onControllerFailover:
当通过zk electing出当前的broker为controller后,调用此函数做初始化工作
增加controller epoch
初始化controller context
startup channel manager,replica state machine和partition state machine
当初始化时发生任何问题, 都会放弃成为controller

 

onNewTopicCreation:
1. call onNewPartitionCreation


onNewPartitionCreation:
1. new partitions –> NewPartition 
2. all replicas of new partitions –> NewReplica
3. new partitions –> OnlinePartition
4. all replicas of new partitions –> OnlineReplica

onPartitionReassignment: (OAR: old assigned replicas; RAR: new re-assigned replicas when reassignment completes)
注释写的很详细,参考其中的例子
1. 在ZK中,将AR更新为OAR + RAR,由于在reassignment的过程中所有这些replicas都可以被认为是assign给该partition
2. 给every replica in OAR + RAR发送LeaderAndIsr request,更新ISR为OAR + RAR
3. 对于RAR - OAR,创建新的replicas,并把replica状态设为NewReplica
4. 等待所有RAR中的replicas和leader完成同步
5. 将RAR中的所有replicas设为OnlineReplica 
6. 在内存中,将AR设为RAR,但Zk里面仍然是OAR + RAR
7. 如果RAR中没有leader,elect一个新的leader
8. 将OAR - RAR中的设为OfflineReplica. shrink isr以remove OAR - RAR,并发送LeaderAndIsr通知leader,最后发送StopReplica(delete = false)
9. 将OAR - RAR中的设为NonExistentReplica 
10. 在ZK中,将AR设为RAR,代表reassignment完成


本文章摘自博客园,原文发布日期:2014-02-26 

目录
相关文章
|
18天前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
45 7
|
18天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
64 5
|
16天前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
17天前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
39 0
|
3月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
45 1
|
28天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
575 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
63 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
217 2
|
3月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
51 3

推荐镜像

更多