NET中解决KafKa多线程发送多主题的问题

简介:

一般在KafKa消费程序中消费可以设置多个主题,那在同一程序中需要向KafKa发送不同主题的消息,如异常需要发到异常主题,正常的发送到正常的主题,这时候就需要实例化多个主题,然后逐个发送。

  在NET中用RdKafka组件来做消息处理,在Nuget中引用。

  在程序中初始化Producer,并创建多个Topic

复制代码
        private string comtopic = "topic1";
        private string errtopic = "topic2";
        private string kfkip = "192.168.80.32:9092";
        Topic topic = null;
        Topic errTopic = null;

        public ExcuteFlow()
        {
            try
            {
                Producer producer = new Producer(kfkip);
                topic = producer.Topic(comtopic);
                errTopic = producer.Topic(errtopic);
            }
            catch (RdKafkaException ex)
            {
                LogHelper.Error("KafKa初始化KafKa异常 ", ex);
            }
            catch (Exception ex)
            {
                LogHelper.Error("KafKa初始化异常", ex);
            }

        }
复制代码

  在程序中发送其中一个主题:

复制代码
          try
            {

                if (topic != null)
                {
                    byte[] datas = Encoding.UTF8.GetBytes(JsonHelper.ToJson(flowCommond));
                    Task<DeliveryReport> deliveryReport = topic.Produce(datas);
                    var unused = deliveryReport.ContinueWith(task =>
                    {
                        LogHelper.Info("内容:{flowCommond.ID} 发送到分区:{task.Result.Partition}, Offset 为: {task.Result.Offset}");
                    });
                }
                else
                {
                    throw new Exception("发送消息到KafKa topic 为空");
                }
            }
            catch (RdKafkaException ex)
            {
                LogHelper.Error("发送消息到KafKa  KafKa异常", ex);
            }
            catch (Exception ex)
            {
                LogHelper.Error("发送消息到KafKa异常", ex);
            }
复制代码

  flowCommond为要发送的对象内容,格式化为Json字符串再发送。

  另一个主题一样处理。

   这里实现一个线程里面发送多个主题,那下面实现多个线程中如何发送多个主题。

  多线程中如果每个线程都new Producer(kfkip) 一次,那KafKa的连接很快会被占满。

  那这里就用单例模式来解决这个问题,每次要用到Producer时检查一下是否已经存在Producer实例,若存在则直接用不用再生成。

复制代码
    /// <summary>
    /// 单例模式的实现
    /// </summary>
    public class SingleProduct : Producer
    {
        // 定义一个静态变量来保存类的实例
        private static SingleProduct uniqueInstance;
        // 定义一个标识确保线程同步
        private static readonly object locker = new object();
        // 定义私有构造函数,使外界不能创建该类实例
        private SingleProduct(string brokerList) : base(brokerList)
        {
        }

        /// <summary>
        /// 定义公有方法提供一个全局访问点,同时你也可以定义公有属性来提供全局访问点
        /// </summary>
        /// <returns></returns>
        public static SingleProduct GetInstance()
        {
            // 当第一个线程运行到这里时,此时会对locker对象 "加锁",
            // 当第二个线程运行该方法时,首先检测到locker对象为"加锁"状态,该线程就会挂起等待第一个线程解锁
            // lock语句运行完之后(即线程运行完之后)会对该对象"解锁"
            if (uniqueInstance == null)
            {
                lock (locker)
                {
                    // 如果类的实例不存在则创建,否则直接返回
                    if (uniqueInstance == null)
                    {
                        string kfkip = System.Configuration.ConfigurationManager.AppSettings["KfkIP"];

                        try
                        {
                            uniqueInstance = new SingleProduct(kfkip);
                            LogHelper.Error("单例模式 实例化 SingleProduct");
                        }
                        catch (RdKafkaException ex)
                        {
                            LogHelper.Error("单例模式 KafKa初始化KafKa异常 ", ex);
                        }
                        catch (Exception ex)
                        {
                            LogHelper.Error("单例模式 KafKa初始化异常", ex);
                        }
                    }
                }
            }

            return uniqueInstance;
        }
    }
复制代码

   然后在初始化的代码中替换Producer producer = new Producer(kfkip);为 Producer producer = SingleProduct.GetInstance();

  OK!以上就完成了多线程多主题的消息发送。

本文转自欢醉博客园博客,原文链接http://www.cnblogs.com/zhangs1986/p/7285525.html如需转载请自行联系原作者


欢醉

相关文章
|
6月前
|
消息中间件 监控 安全
探究Kafka主题删除失败的根本原因
探究Kafka主题删除失败的根本原因
88 0
|
6月前
|
并行计算 安全 Java
C# .NET面试系列四:多线程
<h2>多线程 #### 1. 根据线程安全的相关知识,分析以下代码,当调用 test 方法时 i > 10 时是否会引起死锁? 并简要说明理由。 ```c# public void test(int i) { lock(this) { if (i > 10) { i--; test(i); } } } ``` 在给定的代码中,不会发生死锁。死锁通常是由于两个或多个线程互相等待对方释放锁而无法继续执行的情况。在这个代码中,只有一个线程持有锁,且没有其他线程参与,因此不
383 3
|
2月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
234 4
|
3月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
86 7
|
3月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
158 4
|
4月前
|
消息中间件 存储 Kafka
深入理解Kafka核心设计及原理(四):主题管理
深入理解Kafka核心设计及原理(四):主题管理
74 8
|
6月前
|
消息中间件 存储 网络协议
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
Apache Kafka的单分区写入性能在某些严格保序场景中至关重要,但其现有线程模型限制了性能发挥。本文分析了Kafka的串行处理模型,包括SocketServer、KafkaChannel、RequestChannel等组件,指出其通过KafkaChannel状态机确保请求顺序处理,导致处理效率低下。AutoMQ提出流水线处理模型,简化KafkaChannel状态机,实现网络解析、校验定序和持久化的阶段间并行化,提高处理效率。测试结果显示,AutoMQ的极限吞吐是Kafka的2倍,P99延迟降低至11ms。
121 3
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
|
5月前
|
开发框架 监控 Java
【.NET Core】多线程之线程池(ThreadPool)详解(二)
【.NET Core】多线程之线程池(ThreadPool)详解(二)
88 3
|
5月前
|
SQL 开发框架 Java
【.NET Core】多线程之线程池(ThreadPool)详解(一)
【.NET Core】多线程之线程池(ThreadPool)详解(一)
378 2
|
5月前
|
算法 安全 Java
【.NET Core】 多线程之(Thread)详解
【.NET Core】 多线程之(Thread)详解
60 1
下一篇
无影云桌面