消息队列七

简介: 使用.net客户端连接到Kafka 这里推荐使用一个开源.net客户端:https://github.com/Jroland/kafka-net Producer 1 var options = new KafkaOptions(new Uri("http://SERVER1:90...

使用.net客户端连接到Kafka

这里推荐使用一个开源.net客户端:https://github.com/Jroland/kafka-net

Producer
1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));
2 var router = new BrokerRouter(options);
3 var client = new Producer(router);
4 
5 client.SendMessageAsync("TestHarness", new[] { new Message("hello world")}).Wait();
6 
7 using (client) { }

 

Consumer
 1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));
 2 var router = new BrokerRouter(options);
 3 var consumer = new Consumer(new ConsumerOptions("TestHarness", router));
 4 
 5 //Consume returns a blocking IEnumerable (ie: never ending stream)
 6 foreach (var message in consumer.Consume())
 7 {
 8     Console.WriteLine("Response: P{0},O{1} : {2}", 
 9         message.Meta.PartitionId, message.Meta.Offset, message.Value);  
10 }

 

完整测试代码

Producer生产者:

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             do
 6             {
 7                 Produce(GetKafkaBroker(), getTopicName());
 8                 System.Threading.Thread.Sleep(3000);
 9             } while (true);
10         }
11 
12         private static void Produce(string broker, string topic)
13         {
14             var options = new KafkaOptions(new Uri(broker));
15             var router = new BrokerRouter(options);
16             var client = new Producer(router);
17 
18             var currentDatetime =DateTime.Now;
19             var key = currentDatetime.Second.ToString();
20             var events = new[] { new Message("Hello World " + currentDatetime, key) };
21             client.SendMessageAsync(topic, events).Wait(1500);
22             Console.WriteLine("Produced: Key: {0}. Message: {1}", key, events[0].Value.ToUtf8String());
23 
24             using (client) { }
25         }
26 
27         private static string GetKafkaBroker()
28         {
29             string KafkaBroker = string.Empty;
30             const string kafkaBrokerKeyName = "KafkaBroker";
31 
32             if (!ConfigurationManager.AppSettings.AllKeys.Contains(kafkaBrokerKeyName))
33             {
34                 KafkaBroker = "http://localhost:9092";
35             }
36             else
37             {
38                 KafkaBroker = ConfigurationManager.AppSettings[kafkaBrokerKeyName];
39             }
40             return KafkaBroker;
41         }
42         private static string getTopicName()
43         {
44             string TopicName = string.Empty;
45             const string topicNameKeyName = "Topic";
46 
47             if (!ConfigurationManager.AppSettings.AllKeys.Contains(topicNameKeyName))
48             {
49                 throw new Exception("Key \"" + topicNameKeyName + "\" not found in Config file -> configuration/AppSettings");
50             }
51             else
52             {
53                 TopicName = ConfigurationManager.AppSettings[topicNameKeyName];
54             }
55             return TopicName;
56         }
57     }

 

Consumer消费者:

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             Consume(getKafkaBroker(), getTopicName());
 6 
 7         }
 8 
 9         private static void Consume(string broker, string topic)
10         {   
11             var options = new KafkaOptions(new Uri(broker));
12             var router = new BrokerRouter(options);
13             var consumer = new Consumer(new ConsumerOptions(topic, router));
14 
15             //Consume returns a blocking IEnumerable (ie: never ending stream)
16             foreach (var message in consumer.Consume())
17             {
18                 Console.WriteLine("Response: Partition {0},Offset {1} : {2}",
19                     message.Meta.PartitionId, message.Meta.Offset, message.Value.ToUtf8String());
20             }
21         }
22 
23         private static string getKafkaBroker()
24         {
25             string KafkaBroker = string.Empty;
26             var KafkaBrokerKeyName = "KafkaBroker";
27 
28             if (!ConfigurationManager.AppSettings.AllKeys.Contains(KafkaBrokerKeyName))
29             {
30                 KafkaBroker = "http://localhost:9092";
31             }
32             else
33             {
34                 KafkaBroker = ConfigurationManager.AppSettings[KafkaBrokerKeyName];
35             }
36             return KafkaBroker;
37         }
38 
39         private static string getTopicName()
40         {
41             string TopicName = string.Empty;
42             var TopicNameKeyName = "Topic";
43 
44             if (!ConfigurationManager.AppSettings.AllKeys.Contains(TopicNameKeyName))
45             {
46                 throw new Exception("Key \"" + TopicNameKeyName + "\" not found in Config file -> configuration/AppSettings");
47             }
48             else
49             {
50                 TopicName = ConfigurationManager.AppSettings[TopicNameKeyName];
51             }
52             return TopicName;
53         }
54     }
View Code

 

 

 

消息队列一

 

消息队列二

 

消息队列三

 

消息队列四

 

消息队列五

 

消息队列六

 

消息队列七

 

目录
相关文章
|
消息中间件 存储 数据库
消息队列有什么用
通过异步处理提高系统性能(减少响应所需时间) 削峰/限流 降低系统耦合性。
98 0
|
消息中间件 开发框架 Java
消息队列使用的四种场景介绍(二)
消息队列使用的四种场景介绍
|
消息中间件 负载均衡 Java
什么是优秀的消息队列
简述消息队列,优秀的消息队列的特质及RoketMQ
|
消息中间件 存储 容灾
优秀的消息队列
优秀的消息队列
65 1
|
消息中间件 数据库
|
消息中间件 网络协议
消息队列(二)
消息队列(二)
149 0
消息队列(二)
|
消息中间件 存储 中间件
消息队列(四)
消息队列(四)
203 0
消息队列(四)
|
消息中间件 存储 缓存
消息队列(三)
消息队列(三)
127 0
消息队列(三)
|
消息中间件 存储 缓存
消息队列(六)
消息队列(六)
241 0
消息队列(六)
|
消息中间件 SQL 关系型数据库
消息队列
消息队列
233 0