消息队列七

简介: 使用.net客户端连接到Kafka 这里推荐使用一个开源.net客户端:https://github.com/Jroland/kafka-net Producer 1 var options = new KafkaOptions(new Uri("http://SERVER1:90...

使用.net客户端连接到Kafka

这里推荐使用一个开源.net客户端:https://github.com/Jroland/kafka-net

Producer
1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));
2 var router = new BrokerRouter(options);
3 var client = new Producer(router);
4 
5 client.SendMessageAsync("TestHarness", new[] { new Message("hello world")}).Wait();
6 
7 using (client) { }

 

Consumer
 1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));
 2 var router = new BrokerRouter(options);
 3 var consumer = new Consumer(new ConsumerOptions("TestHarness", router));
 4 
 5 //Consume returns a blocking IEnumerable (ie: never ending stream)
 6 foreach (var message in consumer.Consume())
 7 {
 8     Console.WriteLine("Response: P{0},O{1} : {2}", 
 9         message.Meta.PartitionId, message.Meta.Offset, message.Value);  
10 }

 

完整测试代码

Producer生产者:

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             do
 6             {
 7                 Produce(GetKafkaBroker(), getTopicName());
 8                 System.Threading.Thread.Sleep(3000);
 9             } while (true);
10         }
11 
12         private static void Produce(string broker, string topic)
13         {
14             var options = new KafkaOptions(new Uri(broker));
15             var router = new BrokerRouter(options);
16             var client = new Producer(router);
17 
18             var currentDatetime =DateTime.Now;
19             var key = currentDatetime.Second.ToString();
20             var events = new[] { new Message("Hello World " + currentDatetime, key) };
21             client.SendMessageAsync(topic, events).Wait(1500);
22             Console.WriteLine("Produced: Key: {0}. Message: {1}", key, events[0].Value.ToUtf8String());
23 
24             using (client) { }
25         }
26 
27         private static string GetKafkaBroker()
28         {
29             string KafkaBroker = string.Empty;
30             const string kafkaBrokerKeyName = "KafkaBroker";
31 
32             if (!ConfigurationManager.AppSettings.AllKeys.Contains(kafkaBrokerKeyName))
33             {
34                 KafkaBroker = "http://localhost:9092";
35             }
36             else
37             {
38                 KafkaBroker = ConfigurationManager.AppSettings[kafkaBrokerKeyName];
39             }
40             return KafkaBroker;
41         }
42         private static string getTopicName()
43         {
44             string TopicName = string.Empty;
45             const string topicNameKeyName = "Topic";
46 
47             if (!ConfigurationManager.AppSettings.AllKeys.Contains(topicNameKeyName))
48             {
49                 throw new Exception("Key \"" + topicNameKeyName + "\" not found in Config file -> configuration/AppSettings");
50             }
51             else
52             {
53                 TopicName = ConfigurationManager.AppSettings[topicNameKeyName];
54             }
55             return TopicName;
56         }
57     }

 

Consumer消费者:

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             Consume(getKafkaBroker(), getTopicName());
 6 
 7         }
 8 
 9         private static void Consume(string broker, string topic)
10         {   
11             var options = new KafkaOptions(new Uri(broker));
12             var router = new BrokerRouter(options);
13             var consumer = new Consumer(new ConsumerOptions(topic, router));
14 
15             //Consume returns a blocking IEnumerable (ie: never ending stream)
16             foreach (var message in consumer.Consume())
17             {
18                 Console.WriteLine("Response: Partition {0},Offset {1} : {2}",
19                     message.Meta.PartitionId, message.Meta.Offset, message.Value.ToUtf8String());
20             }
21         }
22 
23         private static string getKafkaBroker()
24         {
25             string KafkaBroker = string.Empty;
26             var KafkaBrokerKeyName = "KafkaBroker";
27 
28             if (!ConfigurationManager.AppSettings.AllKeys.Contains(KafkaBrokerKeyName))
29             {
30                 KafkaBroker = "http://localhost:9092";
31             }
32             else
33             {
34                 KafkaBroker = ConfigurationManager.AppSettings[KafkaBrokerKeyName];
35             }
36             return KafkaBroker;
37         }
38 
39         private static string getTopicName()
40         {
41             string TopicName = string.Empty;
42             var TopicNameKeyName = "Topic";
43 
44             if (!ConfigurationManager.AppSettings.AllKeys.Contains(TopicNameKeyName))
45             {
46                 throw new Exception("Key \"" + TopicNameKeyName + "\" not found in Config file -> configuration/AppSettings");
47             }
48             else
49             {
50                 TopicName = ConfigurationManager.AppSettings[TopicNameKeyName];
51             }
52             return TopicName;
53         }
54     }
View Code

 

 

 

消息队列一

 

消息队列二

 

消息队列三

 

消息队列四

 

消息队列五

 

消息队列六

 

消息队列七

 

相关文章
Linux PWM接口概述 【ChatGPT】
Linux PWM接口概述 【ChatGPT】
函数计算产品使用问题之如何查看函数计算的QPS(每秒查询率)
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
171 0
Visual Studio2019安装闪退(不弹出下载界面)等问题解决方法
Visual Studio2019安装闪退(不弹出下载界面)等问题解决方法
1129 0
Visual Studio2019安装闪退(不弹出下载界面)等问题解决方法
Android系统 adb shell auth授权使用
Android系统 adb shell auth授权使用
1137 2
就AI 基础设施的演进与挑战问题之ZooKeeper的稳定性提升配置优化的问题如何解决
就AI 基础设施的演进与挑战问题之ZooKeeper的稳定性提升配置优化的问题如何解决
PostgreSQL孤儿文件
与所有其他关系数据库系统一样,PostgreSQL需要通过写入wal日志或在Checkpoint时同步数据到数据文件来持久化数据到磁盘上。对于数据文件,一旦Relation达到SEGMENT_SIZE(默认1GB),PostgreSQL就会创建一个新的数据文件。因此如果Relation持续增长,则该Relation可能会由多个文件组成。在这篇文章中想要考虑的问题是,是否可能存在孤儿文件。
设计模式 - 创建型模式_建造者模式
建造者模式所完成的内容就是通过将多个简单对象通过⼀步步的组装构建出⼀个复杂对象的过程。
138 0
设计模式 - 创建型模式_建造者模式
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问