【Kafka从入门到放弃系列 九】Kafka生产者消费者系统实践

简介: 【Kafka从入门到放弃系列 九】Kafka生产者消费者系统实践

在项目实战中就会发现,其实不管是微服务也好,DDD也好,都是为了履行设计原则里的低耦合、高内聚而无论是RabbitMQ还是Kafka,都是通过消息队列的方式对系统进行解耦,在从入门到放弃系列里我详细介绍过其使用背景以及模型等,由于最近的项目用到了RabbitMQ和Kafka,所以索性搭建一个简单的生产者消费者模型。

生产者端

在生产者端,生产者不停的生产消息并发送到kafka的服务器集群上,依据自己的topic和partition:

我们发送Kafka消息的时候,外层的封装方法如下,需要传递一个Kafka的topic、一个用来计算Partition的标识tenantId以及需要传递的消息

public static bool SendKafkaExportData(
      string appName,
      int tenantId,
      int userId,
      string metaObjName,
      string viewName,
      string exportFileName,
      SearchCondition condition,
      string version = null,
      int total = -1,
      ExportFileType fileType = ExportFileType.Xlsx,
      string applicationContext = null,
      string msgTemplate = null)
    {
      Common.HelperObjects.ArgumentHelper.AssertNotEmpty(appName, nameof (appName));
      Common.HelperObjects.ArgumentHelper.AssertNotEmpty(metaObjName, nameof (metaObjName));
      Common.HelperObjects.ArgumentHelper.AssertNotEmpty(viewName, nameof (viewName));
      Common.HelperObjects.ArgumentHelper.AssertNotEmpty(exportFileName, nameof (exportFileName));
      Common.HelperObjects.ArgumentHelper.AssertPositive(tenantId, nameof (tenantId));
      Common.HelperObjects.ArgumentHelper.AssertPositive(userId, nameof (userId));
      Common.HelperObjects.ArgumentHelper.AssertNotNull<SearchCondition>(condition, nameof (condition));
      bool flag = true;
      try
      {
        ExportRequestDataModel exportRequestData = ExportRequestDataModel.GetExportRequestData(appName, tenantId, userId, metaObjName, viewName, exportFileName, condition, version, total, fileType, applicationContext, msgTemplate);
        long num = KafkaProducer.Send<ExportRequestDataModel>("TMLSent", tenantId, exportRequestData);
        ExportRequestDataModel.logger.Debug((object) string.Format("{0}-{1}-{2}发送Kafka消息{3}成功", (object) appName, (object) tenantId, (object) userId, (object) num));
      }
      catch (Exception ex)
      {
        ExportRequestDataModel.logger.Error((object) string.Format("{0}-{1}-{2}发送Kafka消息异常", (object) appName, (object) tenantId, (object) userId), ex);
        flag = false;
      }
      return flag;
    }

而其中的核心方法: long num = KafkaProducer.Send<ExportRequestDataModel>("TMLSent", tenantId, exportRequestData);的实现逻辑如下,将kafka携带的消息序列化为二进制数组:

/// <summary>Send a message to a topic.</summary>
    /// <param name="topic">The name of the topic to send the message to.</param>
    /// <param name="tenant">The id of the tenant the message belongs to.</param>
    /// <param name="value">The message content.</param>
    /// <returns>The offset of the message.</returns>
    public static long Send<T>(string topic, int tenant, T value) where T : IBinarySerializable
    {
      ArgumentHelper.AssertNotEmpty(topic, nameof (topic));
      ArgumentHelper.AssertPositive(tenant, nameof (tenant));
      return KafkaProducer.Send(topic, tenant, (object) value == null ? (byte[]) null : BigEndianEncoder.Encode<T>(value));
    }

消息发送机制如下,获取到需要的topic,用于计算Partition的标识tenantId以及序列化后可以直接发送的二进制字符串消息:

/// <summary>Send a message to a topic.</summary>
    /// <param name="topic">The name of the topic to send the message to.</param>
    /// <param name="tenant">The id of the tenant the message belongs to.</param>
    /// <param name="value">The message content.</param>
    /// <returns>The offset of the message.</returns>
    public static long Send(string topic, int tenant, byte[] value)
    {
      ArgumentHelper.AssertNotEmpty(topic, nameof (topic));
      ArgumentHelper.AssertPositive(tenant, nameof (tenant));
      try
      {
        return KafkaProtocol.Produce(topic, tenant, value);
      }
      catch (ConnectionPoolException ex)
      {
        return KafkaProtocol.Produce(topic, tenant, value);
      }
      catch (KafkaException ex)
      {
        if (ex.Error == ErrorCode.NotLeaderForPartition || ex.Error == ErrorCode.LeaderNotAvailable)
          return KafkaProtocol.Produce(topic, tenant, value);
        throw;
      }
    }

核心的发送方法为:

public static long Produce(string topic, int tenant, byte[] value)
    {
      TopicConfig topicConfig = BaseConfig<KafkaMapping>.Instance.GetTopicConfig(topic);
      int num = tenant % KafkaProtocol.GetTopicPartitionCount(topic);  //计算
      int partitionLeader = KafkaProtocol.GetPartitionLeader(topic, num);  //设置leader
      try
      {
        using (KafkaSession kafkaSession = new KafkaSession(topicConfig.Cluster, partitionLeader))  //创建一个kafka消息发送实例
        {
          Message message = new Message(value, TimeUtil.CurrentTimestamp);
          ProduceRequest request = new ProduceRequest((IDictionary<TopicAndPartition, MessageSet>) new Dictionary<TopicAndPartition, MessageSet>()
          {
            {
              new TopicAndPartition(topic, num),   //设置topic和partition
              new MessageSet(topicConfig.Codecs, (IList<Message>) new List<Message>()
              {
                message
              })
            }
          });   //设置要发送的消息
          ProduceResponse produceResponse = kafkaSession.Issue<ProduceRequest, ProduceResponse>(request);   //发送Kafka消息并
          KafkaProtocol.CheckErrorCode(produceResponse.Error, topic, new int?(num), new int?(tenant));
          return produceResponse.Offset;
        }
      }
      catch (Exception ex)
      {
        KafkaProtocol.RefreshPartitionMetadata(topic);
        throw;
      }
    }

这样一个我们需要传递的消息就发送到对应的topic和对应的partition上了(不同的partition可以存放在不同 的机器上,这样取同样余数的租户的数据会被放置到相同分区),无需再自己封装消息分发。

消费者端

在消费者端,提供消费者集群进行分区消费,需要注意的是:对于一个group而言,消费者的数量不应该多于分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费(保证了一个分区里的消息在一个Group里不会被消费者争夺执行),因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息

在消费者端,机器需要预热并开启消息消费服务,当然也要有关闭消息服务的方法,开启消费服务意味着开启消息接收和开启消息处理线程,关闭消息服务同理表示关闭消息接收和关闭消息处理线程。

/// <summary>
  /// 接收导出消息的服务
  /// </summary>
  public class ReceiveMsgProvider : IReceiveMsgProvider
  {
      #region 日志、构造方法以及单例
      protected static readonly LogWrapper Logger = new LogWrapper();
      private ReceiveMsgProvider()
      {
      }
      public static ReceiveMsgProvider Instance { get; } = new ReceiveMsgProvider();
      #endregion 日志、构造方法以及单例
      #region 开启消息接收服务
      public bool _ActivateService()
      {
          // 预热
         Cloud.Plugins.Helper.ESBProxy.WarmUp();
          //开启消息接收服务
          StartMessageService();
          //开始处理ExportQueue队列中的消息
          ExportConsumer.Instance.BeginImportData();
          Logger.Debug("_ActivateService was called.");
          return true;
      }
      protected void StartMessageService()
      {
          try
          {
              //开始消费消息
              ExportConsumer.Instance.Start();
          }
          catch (Exception ex)
          {
              Logger.Error(ex);
          }
      }
      #endregion 开启消息接收服务
      #region 关闭消息接收服务
      public bool _UnActivateService()
      {
          //关闭消息接收服务
          StopMessageService();
          //关闭处理queue的线程
          ExportConsumer.CloseQueueThreads(); 
          Logger.Debug("_UnActivateService was called.");
          return true;
      }
      protected void StopMessageService()
      {
          try
          {
              //停止消费消息
              ExportConsumer.Instance.Stop();
          }
          catch (Exception ex)
          {
              Logger.Error(ex);
          }
      }
  }

其中,开启和关闭消息接收服务的核心方法如下:

/// <summary>
        /// ESB服务调用入口:启动
        /// </summary>
        public void Start()
        {
             _loggging.Debug("ESB服务调用入口:启动");
            _consumer = new KafkaGroupConsumer(ExportKafkaConst.ExportKafkaConsumerGroup, ExportKafkaConst.ExportKafkaTopic, OnMessage);   //OnMessage即是处理消费逻辑的方法
            _consumer .Start();
        }
        /// <summary>
        /// ESB服务调用入口:停止
        /// </summary>
        public void Stop()
        {
            _loggging.Debug("ESB服务调用入口:停止");
            if (_consumer  != null && _consumer .IsRunning)
            {
                _consumer .Stop();
            }
        }

当然这一套生产消费体系搭建起来后最重要的就是接收和消费消息啦:

/// <summary>
        /// 接收导出消息并放置到缓存队列里
        /// </summary>
        /// <param name="context"></param>
        /// <returns></returns>
        public bool OnMessage(Message context)
        {
            logger.Debug(string.Format("接收到消息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(context.Value)));
            ExportRequestDataModel data = null;
            try
            {
                //读取消息内容
                data = BigEndianEncoder.Decode<ExportRequestDataModel>(context.Value);
                if (data == null || data.MsgType != ExportMessageType.Export)
                {
                    //无消息需要处理
                    return true;
                }
                else
                {
                    logger.Debug(string.Format("成功取到数据:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(data)));
                }
                bool enQueueResult = ExportQueue.En_Queue(data, ApplicationContext.Current.TenantId);
                if (!enQueueResult)
                {
                    logger.Error("导出数据 In_Queue失败:userId:" + ApplicationContext.Current.UserId + " tenantId:" + ApplicationContext.Current.TenantId);
                }
            }
            catch (Exception ex)
            {
                var contextInfo = JsonConvert.SerializeObject(context);
                logger.Error($"导出写入队列失败,接收到的消息为:{contextInfo} ,信息异常信息:" + ex);
            }
            return true;
        }

消息一个个入队后当然接下来就要使用我们启动的一批线程去消费啦:

/// <summary>
        /// 开始对ExportQueue队列中的数据进行梳理
        /// </summary>
        public void BeginImportData()
        {
            ///初始化线程List
            ExportQueue.InstanceExportThreadsList();
            int count = ExportQueue.ExportThreadsList.Count;
            for (int i = 0; i < count; i++)
            {
                logger.Debug("开启线程th_" + i + "");
                ExportQueue.ExportThreadsList[i] = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(DealExportDataInQueue));
                ExportQueue.ExportThreadsList[i].Start(i);
            }
            // 处理小队列
            ExportQueue.ExportSmallThread = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(DealExportDataInQueue));
            ExportQueue.ExportSmallThread.Start(ExportQueue.SmallIndex);
        }

使用Kafka实现生产者消费者系统的整体流程就是这样,说实话我还是对某些实现细节一知半解,但总体而言知道了全套流程是怎么跑通的,至于一些更高阶的认知,期望从之后更深入的学习中来了解到。

相关文章
|
19天前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
2月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
45 0
|
2月前
|
消息中间件 负载均衡 Kafka
【Kafka面试演练】那Kafka消费者手动提交、自动提交有什么区别?
嗯嗯Ok。分区的作用主要就是为了提高Kafka处理消息吞吐量。每一个topic会被分为多个分区。假如同一个topic下有n个分区、n个消费者,这样的话每个分区就会发送消息给对应的一个消费者,这样n个消费者负载均衡地处理消息。同时生产者会发送消息给不同分区,每个分区分给不同的brocker处理,让集群平坦压力,这样大大提高了Kafka的吞吐量。面试官思考中…
70 4
|
2月前
|
消息中间件 Java Kafka
关于kafka消费者超时配置
关于kafka消费者超时配置
94 2
|
2月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
2月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
181 4
|
2月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
2月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
55 1
|
12天前
|
消息中间件 缓存 安全
Kafka 的生产者优秀架构设计
Kafka 的生产者优秀架构设计
21 0
|
13天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。

热门文章

最新文章