消息队列七

简介: 使用.net客户端连接到Kafka 这里推荐使用一个开源.net客户端:https://github.com/Jroland/kafka-net Producer 1 var options = new KafkaOptions(new Uri("http://SERVER1:90...

使用.net客户端连接到Kafka

这里推荐使用一个开源.net客户端:https://github.com/Jroland/kafka-net

Producer
1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));
2 var router = new BrokerRouter(options);
3 var client = new Producer(router);
4 
5 client.SendMessageAsync("TestHarness", new[] { new Message("hello world")}).Wait();
6 
7 using (client) { }

 

Consumer
 1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));
 2 var router = new BrokerRouter(options);
 3 var consumer = new Consumer(new ConsumerOptions("TestHarness", router));
 4 
 5 //Consume returns a blocking IEnumerable (ie: never ending stream)
 6 foreach (var message in consumer.Consume())
 7 {
 8     Console.WriteLine("Response: P{0},O{1} : {2}", 
 9         message.Meta.PartitionId, message.Meta.Offset, message.Value);  
10 }

 

完整测试代码

Producer生产者:

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             do
 6             {
 7                 Produce(GetKafkaBroker(), getTopicName());
 8                 System.Threading.Thread.Sleep(3000);
 9             } while (true);
10         }
11 
12         private static void Produce(string broker, string topic)
13         {
14             var options = new KafkaOptions(new Uri(broker));
15             var router = new BrokerRouter(options);
16             var client = new Producer(router);
17 
18             var currentDatetime =DateTime.Now;
19             var key = currentDatetime.Second.ToString();
20             var events = new[] { new Message("Hello World " + currentDatetime, key) };
21             client.SendMessageAsync(topic, events).Wait(1500);
22             Console.WriteLine("Produced: Key: {0}. Message: {1}", key, events[0].Value.ToUtf8String());
23 
24             using (client) { }
25         }
26 
27         private static string GetKafkaBroker()
28         {
29             string KafkaBroker = string.Empty;
30             const string kafkaBrokerKeyName = "KafkaBroker";
31 
32             if (!ConfigurationManager.AppSettings.AllKeys.Contains(kafkaBrokerKeyName))
33             {
34                 KafkaBroker = "http://localhost:9092";
35             }
36             else
37             {
38                 KafkaBroker = ConfigurationManager.AppSettings[kafkaBrokerKeyName];
39             }
40             return KafkaBroker;
41         }
42         private static string getTopicName()
43         {
44             string TopicName = string.Empty;
45             const string topicNameKeyName = "Topic";
46 
47             if (!ConfigurationManager.AppSettings.AllKeys.Contains(topicNameKeyName))
48             {
49                 throw new Exception("Key \"" + topicNameKeyName + "\" not found in Config file -> configuration/AppSettings");
50             }
51             else
52             {
53                 TopicName = ConfigurationManager.AppSettings[topicNameKeyName];
54             }
55             return TopicName;
56         }
57     }

 

Consumer消费者:

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             Consume(getKafkaBroker(), getTopicName());
 6 
 7         }
 8 
 9         private static void Consume(string broker, string topic)
10         {   
11             var options = new KafkaOptions(new Uri(broker));
12             var router = new BrokerRouter(options);
13             var consumer = new Consumer(new ConsumerOptions(topic, router));
14 
15             //Consume returns a blocking IEnumerable (ie: never ending stream)
16             foreach (var message in consumer.Consume())
17             {
18                 Console.WriteLine("Response: Partition {0},Offset {1} : {2}",
19                     message.Meta.PartitionId, message.Meta.Offset, message.Value.ToUtf8String());
20             }
21         }
22 
23         private static string getKafkaBroker()
24         {
25             string KafkaBroker = string.Empty;
26             var KafkaBrokerKeyName = "KafkaBroker";
27 
28             if (!ConfigurationManager.AppSettings.AllKeys.Contains(KafkaBrokerKeyName))
29             {
30                 KafkaBroker = "http://localhost:9092";
31             }
32             else
33             {
34                 KafkaBroker = ConfigurationManager.AppSettings[KafkaBrokerKeyName];
35             }
36             return KafkaBroker;
37         }
38 
39         private static string getTopicName()
40         {
41             string TopicName = string.Empty;
42             var TopicNameKeyName = "Topic";
43 
44             if (!ConfigurationManager.AppSettings.AllKeys.Contains(TopicNameKeyName))
45             {
46                 throw new Exception("Key \"" + TopicNameKeyName + "\" not found in Config file -> configuration/AppSettings");
47             }
48             else
49             {
50                 TopicName = ConfigurationManager.AppSettings[TopicNameKeyName];
51             }
52             return TopicName;
53         }
54     }
View Code

 

 

 

消息队列一

 

消息队列二

 

消息队列三

 

消息队列四

 

消息队列五

 

消息队列六

 

消息队列七

 

目录
相关文章
|
6天前
|
消息中间件 大数据 Java
什么是消息队列
什么是消息队列
29 0
|
30天前
|
消息中间件 微服务
消息队列的适用场景
消息队列的适用场景
11 0
|
5月前
|
消息中间件 开发框架 Java
消息队列使用的四种场景介绍(二)
消息队列使用的四种场景介绍
|
10月前
|
消息中间件 存储 弹性计算
消息队列详解与应用
消息队列详解与应用
135 0
|
10月前
|
消息中间件 负载均衡 Java
什么是优秀的消息队列
简述消息队列,优秀的消息队列的特质及RoketMQ
|
10月前
|
消息中间件 存储 容灾
优秀的消息队列
优秀的消息队列
50 1
|
消息中间件 网络协议
消息队列(二)
消息队列(二)
121 0
消息队列(二)
|
消息中间件 Java 数据库
消息队列(五)
消息队列(五)
117 0
消息队列(五)
|
消息中间件 数据库
什么时候需要消息队列
什么时候需要消息队列
175 0
|
消息中间件 存储 运维
消息队列浅析
消息队列在大型分布式应用中非常常见,目前还停留在能熟练使用基础功能的阶段,对其高级功能以及背后的原理了解甚少,比如事务消息、有序消息等。 学习之前需要先带着几个问题,为什么会诞生消息队列?消息队列的原理是什么?使用消息队列需要注意什么?
192 0