kafka Detailed Replication Design V3

简介:

参考,https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3

Major changes compared with the v2 proposal
最大的不同在于加入Controller,简化partition的leader electing
并且除了将改动更新到ZK上以外,controller会通过ControllerChannelManager直接和brokers通信,以提高效率,并减少ZK上watcher的数量

  • Leadership changes are now made by a controller.
  • The controller detects broker failures and elects a new leader for each affected partition.
  • Each leadership change is communicated by the controller to each affected broker.
  • The communication between the controller and the broker is done through direct RPC, instead of via Zookeeper.

Overview:

One of the brokers is elected as the controller for the whole cluster. It will be responsible for:

  1. Leadership change of a partition (each leader can independently update ISR)
  2. New topics; deleted topics
  3. Replica re-assignment

After the controller makes a decision, it publishes the decision permanently in ZK and also sends the new decisions to affected brokers through direct RPC. The published decisions are the source of truth and they are used by clients for request routing and by each broker during startup to recover its state. After the broker is started, it picks up new decisions made by the controller through RPC.

Potential benefits:

  1. Easier debugging since leadership changes are made in a central place.
  2. ZK reads/writes needed for leadership changes can be batched (also easier to exploit ZK multi) and thus reduce end-to-end latency during failover.
  3. Fewer ZK watchers.
  4. More efficient communication of state changes by using direct RPC, instead of via a queue implementation in Zookeeper.

Potential downside:

  1. Need controller failover.

 

Paths:

  1. Controller path: stores the current controller info.

    /controller --> {brokerid} (ephemeral; created by controller)

  2. Broker path: stores the information of all live brokers.

    /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin)

  3. Topic path: stores the replication assignment for all partitions in a topic. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id

    /brokers/topics/[topic] --> {part1: [broker1, broker2], part2: [broker2, broker3] ...}  (created by admin)

  4. LeaderAndISR path: stores leader and ISR of a partition

    /brokers/topics/[topic]/[partition_id]/leaderAndISR --> {leader_epoc: epoc, leader: broker_id, ISR: {broker1, broker2}}

    This path is updated by the controller or the current leader. The current leader only updates the ISR part.
    Updating the path requires synchronization using conditional updates to Zookeeper.

  5. PartitionReassignment path: This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully

    /admin/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin) 


  6. PartitionAdd and PartitionRemove path: The paths are used when we want to add/remove partitions to an existing topic. For each partition to be added, the PartitionAdd path stores a list of new replicas and their corresponding assigned brokers. The paths are created by an administrative process and are automatically removed by the controller once the change is complete.

    /admin/partitions_add/[topic]/[partition_id] --> {broker_id …} (created by admin)
    /admin/partitions_remove/[topic]/[partition_id] (created by admin)

 

Terminologies:
AR: assigned replicas, ISR: in-sync replicas

 

A. Failover during broker failure.
首先这个是由/brokers/ids path上watcher触发的,关键要处理那些由于broker down,而导致leader fail的partitions
对于这些partitions,需要在ISR或AR中select出新的leader,并更新ISR
最终把最新的LeaderAndISR更新到Zk,并通过LeaderAndISRCommand发送给affected brokers

Controller watches child changes of /brokers/ids path. When the watcher gets triggered, it calls on_broker_change().


B. Creating/deleting topics
比较简单,对于Creating topics, 关键在于init_leaders
对于deleting topics,关键在于发送StopReplicaCommand to all affected brokers

The controller watches child change of /brokers/topics. When the watcher gets triggered, it calls on_topic_change().


C. Broker acts on commands from the controller
Broker主要从controller接受两种command,LeaderAndISRCommand和StopReplicaCommand
对于LeaderAndISRCommand,根据最新的LeaderAndISR信息,更新local的partition的状态
无非是,通过becomeLeader将本来不是leader的partition replica变为leader,或使用becomeFollower反之

而对于StopReplicaCommand,停止ReplicaFetcherThread和HW checkpoint Thread,并删除local的partition目录

Each broker listens to commands from the controller through RPC.

D. Handling controller failure.

Each broker sets an exists watcher on the ControllerPath. When the watcher gets triggered, it calls on_controller_failover().
Basically, the controller needs to inform all brokers the current states stored in ZK (since some state change commands could be lost during the controller failover).

E. Broker startup
从ZK读取replica assignment和LeaderAndISR信息
创建这些replicas,设置为leader或follower
删除那些已经不被assign给该broker的partitions

When a broker starts up, it calls on_broker_startup(). Basically, the broker needs to first read the current state of each partition from ZK.

F. Replica reassignment:

Controller watches child changes in the PartitionReassignmentPath in ZK. The value of this path contains RAR, the set of brokers that a particular partition will be reassigned to. When the watcher gets triggered, it calls on_partitions_reassigned().


G. Follower fetching from leader and leader advances HW
首先leader和follower之间的数据同步是通过pull而非push的方式,why?
作为Follower会不断的往leader发送ReplicaFetcherRequests来同步数据,leader会把从request中offset开始的messages数据,作为response发送回去
而leader只有在ISR里面所有的follower都已经收到这个message后,才会advance HW到该offset,即leader.hw = min of leo of every replica in ISR
明显的一个问题是,如果某个follower dead或very slow,就会影响到leader commit数据,所以leader需要不断的将这些slow or dead的follower从ISR中踢出去,(通过maybe_change_ISR() )
条件就是leader.leo - leo_min > MAX_BYTES_LAG (slow)或者该follower的leo_min超过MAX_TIME_LAG没有更新(dead)
当然当follower恢复能跟上leader的时候,还会把它加回ISR中,

A follower keeps sending ReplicaFetcherRequests to the leader. A leader advances its HW when every replica in ISR has received messages up to that point. The process at the leader and the follower are described below.

H. Add/remove partitions to an existing topic

Controller watches child changes in the PartitionAdd and the PartitionRemove Path in ZK. When the watcher gets triggered, it calls on_partitions_add() and on_partition_remove(), respectively.


Discussions:

1. End-to-end latency during a broker failure:

增加replica模块,就是为了应对broker failure的case,下面说如果要为10K partitions change leader,需要花费40s

  1. broker shutdown (after closing socket server, need to close request handler, close log)
  2. broker watcher gets triggered in controller
  3. make leadership change and publish the new leader/ISR in ZK (1 ZK write per affected partition)
  4. inform the leadership change to each broker by write to ZKQueue (1 ZK write per broker)
  5. leader waits for followers in ISR to connect (Kafka PRC)
  6. follower truncates its log first (a potential I/O) and then starts fetching from leader

In the critical path, the most time consuming operation is step 3 where we need to write 1 ZK path per partition. Assuming that during a broker failover we need to change leader for 10K partitions and each ZK write takes 4ms, this could take 40 secs. One possibility is to use the multi() support in ZK 3.4 to batch those writes in 1 ZK operation.

2. ZKQueue vs direct RPC:

这里的设计就是balance,如果单纯通过Zk,效率会低点,包含ZK write,watcher firing,Zk read,但是这样代码逻辑简单,而且耦合度低
在controller中直接通过RPC去和brokers通信,效率高点,只需要一次RPC,但是增加了耦合度
个人觉得,如果不是操作很频繁和效率敏感,没有理由增加RPC机制。。。

Communicating between the controller and the brokers via ZK is not efficient. Each communication requires 2 ZK writes (each costs roughly 2 RPC), 1 watcher firing and 1 ZK read. These add up to roughly 6 RPCs per communication. An alternative is to implement an admin RPC in the broker for direct communication between the controller and the brokers. Then each communication costs only 1 RPC. The admin RPC could specify a timeout, during which it expects the admin command to be completed. Using RPC means that when a broker is down, it could miss some commands from the controller. This proposal requires that the broker recover its state during startup by reading state information stored in ZK.

3. Dealing with multiple leaders in transition

这里解释的问题是如果有多个leader出现会怎么样?
场景是initail leader A,暂时dead,然后选了新的leader B,一会A又回来了,这时A和B都会认为自己是leader,那会不会引起问题,导致A和B都可以commit数据量?
答案是不会的,因为现在commit数据,必须要所有follower receive这个message,而当B成为leader时会先关闭ReplicaFetcherThread(从A),所以B是无法从A获取messge的
那么这时A肯定要shrinking the ISR来去掉B,但是在更新ZK时,它会发现ZK中的LeaderAndISR的版本是新的,所以这时A就会意识到它已经不是leader了

Occasionally, it's possible for multiple brokers to simultaneous assume that they are the leader of a partition. For example, broker A is the initial leader of a partition and the ISR of that partition is {A,B,C}.. Then, broker A goes into GC and losses its ZK registration. The controller assumes that broker A is dead, assigns the leader of the partition to broker B and sets the new ISR in ZK to {B,C}. Broker B becomes the leader and at the same time, Broker A wakes up from GC but hasn't acted on the leadership change command sent by the controller. Now, both broker A and B think they are the leader. It would be bad if we allow both broker A and B to commit new messages since the data among replicas will be out of sync. Our current design actually will prevent this from happening in this situation. Here is why. The claim is that after broker B becomes the new leader, broker A can no longer commit new messages any more. For broker A to commit a message m, it needs every replica in ISR to receive m. At the moment, broker A still thinks the ISR is {A,B,C} (its local copy; although the ISR in ZK has changed). Broker B will never receive message m. This is because by becoming the new leader, it must have first stopped fetching data from the previous leader. Therefore broker A can't commit message m without shrinking the ISR first. In order to shrink ISR, broker A has to write the new ISR in ZK. However, it can't do that because it will realize that the LeaderAndISR path in ZK is not on a version that it assumes to be (since it has already been changed by the controller). At this moment, broker A will realize that it's no longer the leader any more.Question A.3, is broker down the only failure scenario that we worry about? Do we worry about leader failure at individual partition level?

4. Is broker down the only failure scenario that we worry about? Do we worry about leader failure at individual partition level?

It seems that common problems such as long GC, I/O on local storage will affect the whole broker.

5. How to deal with repeated topic deletion/creation?

当broker down的时候,某些topic被delete并重新创建,是否会有问题?是否要partition进行版本化以区分出outdated partition
当broker down的时候,新的replicas不会再被assign到该broker上,所以就算recreate相同的topics,也会在其他live的broker上创建新的replicas
所以当broker startup的时候会删除所有已经不被assign给自己的partition,so 没有问题

A broker can be down for a long time during which a topic can be deleted and recreated. Do we need partition version id in order to clean up an outdated partition from local storage? The way our replica assignment works is that we only assign replicas to live brokers. This means that while a broker is down, no new replicas will be assigned to it. So, it seems that we won't get into the situation that when a failed broker comes back, a partition is still assigned to this broker and yet the partition stored locally is outdated. So, we don't really need partition version id to address this issue. On broker startup, the broker will read all local partition directories, delete each directory whose partition is no longer assigned to itself, and then load the last segment of each of the remaining directories.

6. What happens to client routing during a broker failover?

Controller在将新的LeaderAndISR写入ZK后,然后通过RPC通知每个broker,broker在做一系列操作来切换leader和follower
但是当Zk中被更新后,client就已经可以看到,但此时到broker真正完成切换ready有一个时间间隔,有可能client访问到该broker的时候,还没有ready

有个简单的想法是让broker中完成切换后再更新ZK,但明显的问题是会带来不一致,因为controller是唯一的master只有他可以统一的更改LeaderAndISR,而每个broker中不同的时候去更新,过程中旧的leader仍然有权利更改ISR
比较好的方案是,如果是小概率事件,那么就让client失败retry好了

In this proposal, the controller first publishes the new leader for affected partitions in the LeaderAndISR path in ZK, then sends the leadership change commands to the brokers. The brokers then act on those leadership change commands. Since we use the LeaderAndISR path to route the client request, there is a window (potentially small) that a client is routed to a broker that's the new leader, but the broker is not ready to be the new leader yet.

For reference, HBase only updates the metadata (for client routing) after the regionserver responds to close/open region commands. So, one would think that instead of the controller directly updating the LeaderAndISR path, we can let each broker update that path after it completes the execution of the command. There is actually a critical reason that the leaderAndISR path has to be updated by the controller. This is because we rely on the leaderAndISR path in ZK to synchronize between the controller and the leader of a partition. After the controller makes a leadership change decision, it doesn't want the ISR to be changed by the current leader any more. Otherwise, the newly elected leader could be taken out of ISR by the current leader before the new leader takes over the leadership. By publishing the new leader immediately in the leaderAndISR path, the controller prevents the current leader from updating the ISR any more.
One possibility is to use another ZK path ExternalView for client routing. The controller only updates ExternalView after the broker responds positively for the leadership change command. There is a tradeoff between using 1 ExternalView path for all partitions or 1 ExternalView path per partition. The former has less ZK overhead, but potentially forces unnecessary rebalancing on the consumers.Aonther way to think about this is that in the normal case, leadership change commands are executed very quickly. So we probably can just rely on client side retry logic to handle the transition period. In the uncommon case that somehow it takes too long for a broker to become a leader (likely due to a bug), the controller will get a timeout and can trigger an alert so that an admin can take a look at it. So, we probably can start without the ExternalView path and reconsider it if it's really needed.

7. Dealing with offsets beyond HW in fetch requests during leadership change

Follower的HW总是比leader要lag的,所以在leader切换时,client可能会仍然根据老的leader的HW发起request, 这样会落在新的leader的HW and LEO之间,现在的处理是返回empty message set

In general, the HW in the follower always lags that in the leader. So, during a leadership change, a consumer client could be requesting an offset between the new leader's HW and LEO. Normally, the server will return an OffsetOutOfRangeException to the client. In this particular case, the client request is actually valid. To deal with this case, the server can return an empty message set to the consumer if the requested offset is between HW and LEO.

8. Can follower keep up with the leader?

In general, we need to have as much I/O parallelism in the follower as in the leader. Probably need to think a bit more on this.


本文章摘自博客园,原文发布日期:2014-02-28

目录
相关文章
|
1月前
|
消息中间件 JSON 大数据
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
41 1
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
49 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
297 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
68 3
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
141 0
|
3月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
下一篇
无影云桌面