Sticky策略
这样的分区策略是从0.11版本才开始引入的,它主要有两个目的
- 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个
- 分区的分配要尽可能与上次分配的保持相同
举例进行分析:比如有3个消费者(C0,C1,C2),都订阅了2个主题(T0 和 T1)并且每个主题都有 3 个分区(p0、p1、p2),那么所订阅的所有分区可以标识为T0p0、T0p1、T0p2、T1p0、T1p1、T1p2。此时使用Sticky分配策略后,得到的分区分配结果和RoudRobin相同:
但如果这里假设C2故障退出了消费者组,然后需要对分区进行再平衡操作,如果使用的是RoundRobin分配策略,它会按照消费者C0和C1进行重新轮询分配,再平衡后的结果如下:
但是如果使用的是Sticky分配策略,再平衡后的结果会是这样:
虽然触发了再分配,但是记忆了上一次C0和C1的分配结果。这样的好处是发生分区重分配后,对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次处理一遍,这时就会浪费系统资源。而使用Sticky策略就可以让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而可以减少系统资源的损耗以及其它异常情况的发生
offset的维护
在现实情况下,消费者在消费数据时可能会出现各种会导致宕机的故障问题,这个时候,如果消费者后续恢复了,它就需要从发生故障前的位置开始继续消费,而不是从头开始消费。所以消费者需要实时的记录自己消费到了哪个offset,便于后续发生故障恢复后继续消费。Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为 __consumer_offsets
:
同一个组里的,当动态扩展分区分配时新进入的消费者接着消费分区消息而不是重新消费。offset是按照:goup+topic+partion来划分的,这样保证组内机器有问题时能接着消费
Zookeeper管理
在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:Producer端注册及管理、Consumer端注册及管理以及Kafka集群策略管理 等。
Producer端注册及管理
在Producer端Zookeeper能够实现:注册并动态调整broker,注册并动态调整topic,Producers负载均衡。
注册并动态调整Broker
broker是注册在zookeeper中的,还记得在分布式集群搭建的时候,我们在zk的配置文件中添加的服务节点,就是用来注册broker的。
- 存放地址:为了记录 broker 的注册信息,在 ZooKeeper 上,专门创建了属于 Kafka 的一个节点,其路径为 /brokers
- 创建节点: Kafka 的每个 broker 启动时,都会到 ZooKeeper 中进行注册,告诉 ZooKeeper 其 broker.id,在整个集群中,broker.id 应该全局唯一,并在 ZooKeeper 上创建其属于自己的节点,其节点路径为
/brokers/ids/{broker.id}
; 创建完节点后,Kafka 会将该 broker 的 broker.name 及端口号记录到该节点; - 删除节点:该 broker 节点属性为临时节点,当 broker 会话失效时,ZooKeeper 会删除该节点,这样,我们就可以很方便的监控到broker 节点的变化,及时调整负载均衡等。
当然注册完Broker还需要注册Topic
注册并动态调整Topic
在 Kafka 中,所有 topic 与 broker 的对应关系都由 ZooKeeper 进行维护,在 ZooKeeper 中,建立专门的节点来记录这些信息,其节点路径为 /brokers/topics/{topic_name}
前面说过,为了保障数据的可靠性,每个 Topic 的 Partitions 实际上是存在备份的,并且备份的数量由 Kafka 机制中的 replicas 来控制。
Producers负载均衡
对于同一个 topic 的不同 partition,Kafka会尽力将这些 partition 分布到不同的 broker 服务器上,这种均衡策略实际上是基于 ZooKeeper 实现的。
- 监听broker变化,producers 启动后也要到 ZooKeeper 下注册,创建一个临时节点来监听 broker 服务器列表的变化。由于ZooKeeper 下 broker 创建的也是临时节点,当 brokers 发生变化时,producers 可以得到相关的通知,从改变自己的 broker list。
- 监听topic变化,topic 的变化以及broker 和 topic 的关系变化,也是通过 ZooKeeper 的 Watcher 监听实现的
当broker变化以及topic变化的时候,zookeeper能监听到,并控制消息和分区的分布。
Kafka集群策略管理
除了生产者涉及的管理行为,在我们前面提到的故障转移机制以及分区策略等内容中相关的其它管理行为也是由Zookeeper完成的
- 选举leader,Kafka 为每一个 partition 找一个节点作为 leader,其余备份作为 follower,如果 leader 挂了,follower 们会选举出一个新的 leader 替代,继续业务
- 副本同步,当 producer push 的消息写入 partition(分区) 时,作为 leader 的 broker(Kafka 节点) 会将消息写入自己的分区,同时还会将此消息复制到各个 follower,实现同步。
- 维护ISR,如果某个follower 挂掉,leader 会再找一个替代并同步消息
所有的这些操作都是Zookeeper做的。
Consumer端注册及管理
在Consumer端Zookeeper能够实现:注册并动态调整Consumer,Consumer负载均衡。
注册并动态调整Consumer
在消费者端ZooKeeper 做的工作有那些呢?
- 注册新的消费者分组,当新的消费者组注册到 ZooKeeper 中时,ZooKeeper 会创建专用的节点来保存相关信息,其节点路径为
/consumers/{group_id}
,其节点下有三个子节点,分别为 [ids, owners, offsets]。
- ids 节点:记录该消费组中当前正在消费的消费者,记录分组下消费者
- owners 节点:记录该消费组消费的 topic 信息,
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
,其中,[broker_id-partition_id]
就是一个消息分区的标识,节点内容就是该 消息分区上消费者的Consumer ID,这样分区和消费者就能关联起来了。关联分区和消费者 - offsets 节点:记录每个 topic 的每个分区offset,在消费者对指定消息分区进行消息消费的过程中,需要定时将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能从之前进度继续消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值,记录消费者offset,当然新版本的不记录在zookeeper中
- 注册新的消费者,当新的消费者注册到 Kafka 中时,会在
/consumers/{group_id}/ids
节点下创建临时子节点,并记录相关信息。 - 监听消费者分组中消费者的变化,每个消费者都要关注其所属消费者组中消费者数目的变化,即监听
/consumers/{group_id}/ids
下子节点的变化。一但发现消费者新增或减少,就会触发消费者的负载均衡。
其实不光是注册consumer,还包括对消费者策略的管理,例如Consumer负载均衡
Consumer负载均衡
Consumer在启动时会到 ZooKeeper下以自己的 Consumer-id 创建临时节点 /consumer/[group-id]/ids/[conusmer-id]
,并对 /consumer/[group-id]/ids
注册监听事件:
- 监听消费者列表,当消费者发生变化时,同一 group 的其余消费者会得到通知。
- 监听broker列表,消费者还要监听 broker 列表的变化。
然后按照我们之前提到的策略进行排序和消费
Kafka框架搭建实战
好了,在了解了这么多基础知识以及核心原理之后,我们再来看看真正的实战场景是如何操作的。我们有如下场景,假设我们要从当前站点发送一个导出消息给导出ESB,由导出ESB处理业务逻辑来达到解耦的目标该怎么通过Kafka实现呢?
发送消息
我们发送Kafka消息的时候,外层的封装方法如下,需要传递一个Kafka的topic、一个用来计算Partition【路由转发】的标识key【tenantId】,以及需要传递的消息。
public static bool SendKafkaExportData( string appName, int tenantId, int userId, string metaObjName, string viewName, string exportFileName, SearchCondition condition, string version = null, int total = -1, ExportFileType fileType = ExportFileType.Xlsx, string applicationContext = null, string msgTemplate = null) { Common.HelperObjects.ArgumentHelper.AssertNotEmpty(appName, nameof (appName)); Common.HelperObjects.ArgumentHelper.AssertNotEmpty(metaObjName, nameof (metaObjName)); Common.HelperObjects.ArgumentHelper.AssertNotEmpty(viewName, nameof (viewName)); Common.HelperObjects.ArgumentHelper.AssertNotEmpty(exportFileName, nameof (exportFileName)); Common.HelperObjects.ArgumentHelper.AssertPositive(tenantId, nameof (tenantId)); Common.HelperObjects.ArgumentHelper.AssertPositive(userId, nameof (userId)); Common.HelperObjects.ArgumentHelper.AssertNotNull<SearchCondition>(condition, nameof (condition)); bool flag = true; try { ExportRequestDataModel exportRequestData = ExportRequestDataModel.GetExportRequestData(appName, tenantId, userId, metaObjName, viewName, exportFileName, condition, version, total, fileType, applicationContext, msgTemplate); long num = KafkaProducer.Send<ExportRequestDataModel>("TMLSent", tenantId, exportRequestData); ExportRequestDataModel.logger.Debug((object) string.Format("{0}-{1}-{2}发送Kafka消息{3}成功", (object) appName, (object) tenantId, (object) userId, (object) num)); } catch (Exception ex) { ExportRequestDataModel.logger.Error((object) string.Format("{0}-{1}-{2}发送Kafka消息异常", (object) appName, (object) tenantId, (object) userId), ex); flag = false; } return flag; }
而其中的核心方法: long num = KafkaProducer.Send<ExportRequestDataModel>("TMLSent", tenantId, exportRequestData);
的实现逻辑如下,将kafka携带的消息序列化为二进制数组:
/// <summary>Send a message to a topic.</summary> /// <param name="topic">The name of the topic to send the message to.</param> /// <param name="tenant">The id of the tenant the message belongs to.</param> /// <param name="value">The message content.</param> /// <returns>The offset of the message.</returns> public static long Send<T>(string topic, int tenant, T value) where T : IBinarySerializable { ArgumentHelper.AssertNotEmpty(topic, nameof (topic)); ArgumentHelper.AssertPositive(tenant, nameof (tenant)); return KafkaProducer.Send(topic, tenant, (object) value == null ? (byte[]) null : BigEndianEncoder.Encode<T>(value)); }
消息发送机制如下,获取到需要的topic,用于计算Partition的标识tenantId以及序列化后可以直接发送的二进制字符串消息:
/// <summary>Send a message to a topic.</summary> /// <param name="topic">The name of the topic to send the message to.</param> /// <param name="tenant">The id of the tenant the message belongs to.</param> /// <param name="value">The message content.</param> /// <returns>The offset of the message.</returns> public static long Send(string topic, int tenant, byte[] value) { ArgumentHelper.AssertNotEmpty(topic, nameof (topic)); ArgumentHelper.AssertPositive(tenant, nameof (tenant)); try { return KafkaProtocol.Produce(topic, tenant, value); } catch (ConnectionPoolException ex) { return KafkaProtocol.Produce(topic, tenant, value); } catch (KafkaException ex) { if (ex.Error == ErrorCode.NotLeaderForPartition || ex.Error == ErrorCode.LeaderNotAvailable) return KafkaProtocol.Produce(topic, tenant, value); throw; } }
核心的发送方法为:
public static long Produce(string topic, int tenant, byte[] value) { TopicConfig topicConfig = BaseConfig<KafkaMapping>.Instance.GetTopicConfig(topic); int num = tenant % KafkaProtocol.GetTopicPartitionCount(topic); //计算要发往哪个分区 int partitionLeader = KafkaProtocol.GetPartitionLeader(topic, num); //获取分区leader try { using (KafkaSession kafkaSession = new KafkaSession(topicConfig.Cluster, partitionLeader)) //创建一个kafka消息发送实例 { Message message = new Message(value, TimeUtil.CurrentTimestamp); ProduceRequest request = new ProduceRequest((IDictionary<TopicAndPartition, MessageSet>) new Dictionary<TopicAndPartition, MessageSet>() { { new TopicAndPartition(topic, num), //将设置好的topic和partition传入参数 new MessageSet(topicConfig.Codecs, (IList<Message>) new List<Message>() { message }) } }); //设置要发送的消息 ProduceResponse produceResponse = kafkaSession.Issue<ProduceRequest, ProduceResponse>(request); //发送Kafka消息并 KafkaProtocol.CheckErrorCode(produceResponse.Error, topic, new int?(num), new int?(tenant)); return produceResponse.Offset; } } catch (Exception ex) { KafkaProtocol.RefreshPartitionMetadata(topic); throw; } }
这样一个我们需要传递的消息就发送到对应的topic和对应的partition上了(不同的partition可以存放在不同 的机器上,这样取同样余数的租户的数据会被放置到相同分区),无需再自己封装消息分发。
消费消息
在消费者端,机器需要预热并开启消息消费服务,当然也要有关闭消息服务的方法,开启消费服务意味着开启消息接收和开启消息处理线程,关闭消息服务同理表示关闭消息接收和关闭消息处理线程。
/// <summary> /// 接收导出消息的服务 /// </summary> public class ReceiveMsgProvider : IReceiveMsgProvider { #region 日志、构造方法以及单例 protected static readonly LogWrapper Logger = new LogWrapper(); private ReceiveMsgProvider() { } public static ReceiveMsgProvider Instance { get; } = new ReceiveMsgProvider(); #endregion 日志、构造方法以及单例 #region 开启消息接收服务 public bool _ActivateService() { // 预热 Cloud.Plugins.Helper.ESBProxy.WarmUp(); //开启消息接收服务 StartMessageService(); //开始处理ExportQueue队列中的消息 ExportConsumer.Instance.BeginImportData(); Logger.Debug("_ActivateService was called."); return true; } protected void StartMessageService() { try { //开始消费消息 ExportConsumer.Instance.Start(); } catch (Exception ex) { Logger.Error(ex); } } #endregion 开启消息接收服务 #region 关闭消息接收服务 public bool _UnActivateService() { //关闭消息接收服务 StopMessageService(); //关闭处理queue的线程 ExportConsumer.CloseQueueThreads(); Logger.Debug("_UnActivateService was called."); return true; } protected void StopMessageService() { try { //停止消费消息 ExportConsumer.Instance.Stop(); } catch (Exception ex) { Logger.Error(ex); } } }
其中,开启和关闭消息接收服务的核心方法如下:
/// <summary> /// ESB服务调用入口:启动 /// </summary> public void Start() { _loggging.Debug("ESB服务调用入口:启动"); //开启一个消费者组实例,这里设置启用了消费者组来接收消息,相当于启动了一个消费者组实例,在OnMessage里去具体写接收到消息之后的代码处理逻辑 _consumer = new KafkaGroupConsumer(ExportKafkaConst.ExportKafkaConsumerGroup, ExportKafkaConst.ExportKafkaTopic, OnMessage); _consumer .Start(); } /// <summary> /// ESB服务调用入口:停止 /// </summary> public void Stop() { _loggging.Debug("ESB服务调用入口:停止"); if (_consumer != null && _consumer .IsRunning) { _consumer .Stop(); } }
其中消费者的核心实现方法如下:
public KafkaGroupConsumer(string consumerGroup, string topic, Func<Message, bool> handler) { ArgumentHelper.AssertNotEmpty(consumerGroup, "consumerGroup"); //消费者组 ArgumentHelper.AssertNotEmpty(topic, "topic"); //消费主题 ArgumentHelper.AssertNotNull(handler, "handler"); //消息处理函数 _consumerGroup = consumerGroup; //设置消费者组 _topic = topic; //设置topic _consumerId = GenerateConsumerId(consumerGroup); //按照自定义规则给消费者组内生成一个消费者id _handler = handler; ConsumerConfig consumerConfig = BaseConfig<KafkaMapping>.Instance.GetConsumerConfig(consumerGroup);//获取消费者组配置,例如该组内消费者的重试机制、reblance原则等等消费者组的配置 _context = new ConsumerContext(consumerGroup, topic, _consumerId, consumerConfig, Trace.GetTraceHandler(topic, consumerGroup, handler, newConsumer: true)); //整个消费者的上下文,包括组的设置,当前id以及调用trace链路[便于排查问题] _zooKeeperStateManager = new ZooKeeperStateManager(_context); //将该上下文注册到zookeeper中 BaseConfig<KafkaMapping>.ConfigChanged += ReloadConfig; //将消费者变更注册到Zookeeper,当消费者发生变化时,同一 group 的其余消费者会得到通知 }
使用Kafka实现生产者消费者系统的整体流程就是这样。
行文至此,已洋洋洒洒3万5千言,希望能让你对Kafka有个整体的认知,大家共同进步,与诸君共勉