【Kafka从入门到放弃系列 九】Kafka生产者消费者系统实践

简介: 【Kafka从入门到放弃系列 九】Kafka生产者消费者系统实践

在项目实战中就会发现,其实不管是微服务也好,DDD也好,都是为了履行设计原则里的低耦合、高内聚而无论是RabbitMQ还是Kafka,都是通过消息队列的方式对系统进行解耦,在从入门到放弃系列里我详细介绍过其使用背景以及模型等,由于最近的项目用到了RabbitMQ和Kafka,所以索性搭建一个简单的生产者消费者模型。

生产者端

在生产者端,生产者不停的生产消息并发送到kafka的服务器集群上,依据自己的topic和partition:

我们发送Kafka消息的时候,外层的封装方法如下,需要传递一个Kafka的topic、一个用来计算Partition的标识tenantId以及需要传递的消息

public static bool SendKafkaExportData(
      string appName,
      int tenantId,
      int userId,
      string metaObjName,
      string viewName,
      string exportFileName,
      SearchCondition condition,
      string version = null,
      int total = -1,
      ExportFileType fileType = ExportFileType.Xlsx,
      string applicationContext = null,
      string msgTemplate = null)
    {
      Common.HelperObjects.ArgumentHelper.AssertNotEmpty(appName, nameof (appName));
      Common.HelperObjects.ArgumentHelper.AssertNotEmpty(metaObjName, nameof (metaObjName));
      Common.HelperObjects.ArgumentHelper.AssertNotEmpty(viewName, nameof (viewName));
      Common.HelperObjects.ArgumentHelper.AssertNotEmpty(exportFileName, nameof (exportFileName));
      Common.HelperObjects.ArgumentHelper.AssertPositive(tenantId, nameof (tenantId));
      Common.HelperObjects.ArgumentHelper.AssertPositive(userId, nameof (userId));
      Common.HelperObjects.ArgumentHelper.AssertNotNull<SearchCondition>(condition, nameof (condition));
      bool flag = true;
      try
      {
        ExportRequestDataModel exportRequestData = ExportRequestDataModel.GetExportRequestData(appName, tenantId, userId, metaObjName, viewName, exportFileName, condition, version, total, fileType, applicationContext, msgTemplate);
        long num = KafkaProducer.Send<ExportRequestDataModel>("TMLSent", tenantId, exportRequestData);
        ExportRequestDataModel.logger.Debug((object) string.Format("{0}-{1}-{2}发送Kafka消息{3}成功", (object) appName, (object) tenantId, (object) userId, (object) num));
      }
      catch (Exception ex)
      {
        ExportRequestDataModel.logger.Error((object) string.Format("{0}-{1}-{2}发送Kafka消息异常", (object) appName, (object) tenantId, (object) userId), ex);
        flag = false;
      }
      return flag;
    }

而其中的核心方法: long num = KafkaProducer.Send<ExportRequestDataModel>("TMLSent", tenantId, exportRequestData);的实现逻辑如下,将kafka携带的消息序列化为二进制数组:

/// <summary>Send a message to a topic.</summary>
    /// <param name="topic">The name of the topic to send the message to.</param>
    /// <param name="tenant">The id of the tenant the message belongs to.</param>
    /// <param name="value">The message content.</param>
    /// <returns>The offset of the message.</returns>
    public static long Send<T>(string topic, int tenant, T value) where T : IBinarySerializable
    {
      ArgumentHelper.AssertNotEmpty(topic, nameof (topic));
      ArgumentHelper.AssertPositive(tenant, nameof (tenant));
      return KafkaProducer.Send(topic, tenant, (object) value == null ? (byte[]) null : BigEndianEncoder.Encode<T>(value));
    }

消息发送机制如下,获取到需要的topic,用于计算Partition的标识tenantId以及序列化后可以直接发送的二进制字符串消息:

/// <summary>Send a message to a topic.</summary>
    /// <param name="topic">The name of the topic to send the message to.</param>
    /// <param name="tenant">The id of the tenant the message belongs to.</param>
    /// <param name="value">The message content.</param>
    /// <returns>The offset of the message.</returns>
    public static long Send(string topic, int tenant, byte[] value)
    {
      ArgumentHelper.AssertNotEmpty(topic, nameof (topic));
      ArgumentHelper.AssertPositive(tenant, nameof (tenant));
      try
      {
        return KafkaProtocol.Produce(topic, tenant, value);
      }
      catch (ConnectionPoolException ex)
      {
        return KafkaProtocol.Produce(topic, tenant, value);
      }
      catch (KafkaException ex)
      {
        if (ex.Error == ErrorCode.NotLeaderForPartition || ex.Error == ErrorCode.LeaderNotAvailable)
          return KafkaProtocol.Produce(topic, tenant, value);
        throw;
      }
    }

核心的发送方法为:

public static long Produce(string topic, int tenant, byte[] value)
    {
      TopicConfig topicConfig = BaseConfig<KafkaMapping>.Instance.GetTopicConfig(topic);
      int num = tenant % KafkaProtocol.GetTopicPartitionCount(topic);  //计算
      int partitionLeader = KafkaProtocol.GetPartitionLeader(topic, num);  //设置leader
      try
      {
        using (KafkaSession kafkaSession = new KafkaSession(topicConfig.Cluster, partitionLeader))  //创建一个kafka消息发送实例
        {
          Message message = new Message(value, TimeUtil.CurrentTimestamp);
          ProduceRequest request = new ProduceRequest((IDictionary<TopicAndPartition, MessageSet>) new Dictionary<TopicAndPartition, MessageSet>()
          {
            {
              new TopicAndPartition(topic, num),   //设置topic和partition
              new MessageSet(topicConfig.Codecs, (IList<Message>) new List<Message>()
              {
                message
              })
            }
          });   //设置要发送的消息
          ProduceResponse produceResponse = kafkaSession.Issue<ProduceRequest, ProduceResponse>(request);   //发送Kafka消息并
          KafkaProtocol.CheckErrorCode(produceResponse.Error, topic, new int?(num), new int?(tenant));
          return produceResponse.Offset;
        }
      }
      catch (Exception ex)
      {
        KafkaProtocol.RefreshPartitionMetadata(topic);
        throw;
      }
    }

这样一个我们需要传递的消息就发送到对应的topic和对应的partition上了(不同的partition可以存放在不同 的机器上,这样取同样余数的租户的数据会被放置到相同分区),无需再自己封装消息分发。

消费者端

在消费者端,提供消费者集群进行分区消费,需要注意的是:对于一个group而言,消费者的数量不应该多于分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费(保证了一个分区里的消息在一个Group里不会被消费者争夺执行),因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息

在消费者端,机器需要预热并开启消息消费服务,当然也要有关闭消息服务的方法,开启消费服务意味着开启消息接收和开启消息处理线程,关闭消息服务同理表示关闭消息接收和关闭消息处理线程。

/// <summary>
  /// 接收导出消息的服务
  /// </summary>
  public class ReceiveMsgProvider : IReceiveMsgProvider
  {
      #region 日志、构造方法以及单例
      protected static readonly LogWrapper Logger = new LogWrapper();
      private ReceiveMsgProvider()
      {
      }
      public static ReceiveMsgProvider Instance { get; } = new ReceiveMsgProvider();
      #endregion 日志、构造方法以及单例
      #region 开启消息接收服务
      public bool _ActivateService()
      {
          // 预热
         Cloud.Plugins.Helper.ESBProxy.WarmUp();
          //开启消息接收服务
          StartMessageService();
          //开始处理ExportQueue队列中的消息
          ExportConsumer.Instance.BeginImportData();
          Logger.Debug("_ActivateService was called.");
          return true;
      }
      protected void StartMessageService()
      {
          try
          {
              //开始消费消息
              ExportConsumer.Instance.Start();
          }
          catch (Exception ex)
          {
              Logger.Error(ex);
          }
      }
      #endregion 开启消息接收服务
      #region 关闭消息接收服务
      public bool _UnActivateService()
      {
          //关闭消息接收服务
          StopMessageService();
          //关闭处理queue的线程
          ExportConsumer.CloseQueueThreads(); 
          Logger.Debug("_UnActivateService was called.");
          return true;
      }
      protected void StopMessageService()
      {
          try
          {
              //停止消费消息
              ExportConsumer.Instance.Stop();
          }
          catch (Exception ex)
          {
              Logger.Error(ex);
          }
      }
  }

其中,开启和关闭消息接收服务的核心方法如下:

/// <summary>
        /// ESB服务调用入口:启动
        /// </summary>
        public void Start()
        {
             _loggging.Debug("ESB服务调用入口:启动");
            _consumer = new KafkaGroupConsumer(ExportKafkaConst.ExportKafkaConsumerGroup, ExportKafkaConst.ExportKafkaTopic, OnMessage);   //OnMessage即是处理消费逻辑的方法
            _consumer .Start();
        }
        /// <summary>
        /// ESB服务调用入口:停止
        /// </summary>
        public void Stop()
        {
            _loggging.Debug("ESB服务调用入口:停止");
            if (_consumer  != null && _consumer .IsRunning)
            {
                _consumer .Stop();
            }
        }

当然这一套生产消费体系搭建起来后最重要的就是接收和消费消息啦:

/// <summary>
        /// 接收导出消息并放置到缓存队列里
        /// </summary>
        /// <param name="context"></param>
        /// <returns></returns>
        public bool OnMessage(Message context)
        {
            logger.Debug(string.Format("接收到消息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(context.Value)));
            ExportRequestDataModel data = null;
            try
            {
                //读取消息内容
                data = BigEndianEncoder.Decode<ExportRequestDataModel>(context.Value);
                if (data == null || data.MsgType != ExportMessageType.Export)
                {
                    //无消息需要处理
                    return true;
                }
                else
                {
                    logger.Debug(string.Format("成功取到数据:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(data)));
                }
                bool enQueueResult = ExportQueue.En_Queue(data, ApplicationContext.Current.TenantId);
                if (!enQueueResult)
                {
                    logger.Error("导出数据 In_Queue失败:userId:" + ApplicationContext.Current.UserId + " tenantId:" + ApplicationContext.Current.TenantId);
                }
            }
            catch (Exception ex)
            {
                var contextInfo = JsonConvert.SerializeObject(context);
                logger.Error($"导出写入队列失败,接收到的消息为:{contextInfo} ,信息异常信息:" + ex);
            }
            return true;
        }

消息一个个入队后当然接下来就要使用我们启动的一批线程去消费啦:

/// <summary>
        /// 开始对ExportQueue队列中的数据进行梳理
        /// </summary>
        public void BeginImportData()
        {
            ///初始化线程List
            ExportQueue.InstanceExportThreadsList();
            int count = ExportQueue.ExportThreadsList.Count;
            for (int i = 0; i < count; i++)
            {
                logger.Debug("开启线程th_" + i + "");
                ExportQueue.ExportThreadsList[i] = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(DealExportDataInQueue));
                ExportQueue.ExportThreadsList[i].Start(i);
            }
            // 处理小队列
            ExportQueue.ExportSmallThread = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(DealExportDataInQueue));
            ExportQueue.ExportSmallThread.Start(ExportQueue.SmallIndex);
        }

使用Kafka实现生产者消费者系统的整体流程就是这样,说实话我还是对某些实现细节一知半解,但总体而言知道了全套流程是怎么跑通的,至于一些更高阶的认知,期望从之后更深入的学习中来了解到。

相关文章
|
7月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
348 7
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
1023 7
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
488 4
|
10月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
394 16
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的执行过程
Kafka生产者(Producer)将消息序列化后发送到指定主题的分区。整个过程由主线程和Sender线程协调完成。主线程创建KafkaProducer对象及ProducerRecord,经过拦截器、序列化器和分区器处理后,消息进入累加器。Sender线程负责从累加器获取消息并发送至KafkaBroker,Broker返回响应或错误信息,生产者根据反馈决定是否重发。视频和图片详细展示了这一流程。
288 61
|
12月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
584 10
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的消息发送方式
Kafka生产者支持三种消息发送方式:1. **fire-and-forget**:发送后不关心结果,适用于允许消息丢失的场景;2. **同步发送**:通过Future对象确保消息成功送达,适用于高可靠性需求场景;3. **异步发送**:使用回调函数处理结果,吞吐量较高但牺牲部分可靠性。视频和代码示例详细讲解了这三种方式的具体实现。
409 5
|
消息中间件 Kafka
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
311 1
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
379 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
569 2