1 class Program
2 {
3 static void Main(string[] args)
4 {
5 Consume(getKafkaBroker(), getTopicName());
6
7 }
8
9 private static void Consume(string broker, string topic)
10 {
11 var options = new KafkaOptions(new Uri(broker));
12 var router = new BrokerRouter(options);
13 var consumer = new Consumer(new ConsumerOptions(topic, router));
14
15 //Consume returns a blocking IEnumerable (ie: never ending stream)
16 foreach (var message in consumer.Consume())
17 {
18 Console.WriteLine("Response: Partition {0},Offset {1} : {2}",
19 message.Meta.PartitionId, message.Meta.Offset, message.Value.ToUtf8String());
20 }
21 }
22
23 private static string getKafkaBroker()
24 {
25 string KafkaBroker = string.Empty;
26 var KafkaBrokerKeyName = "KafkaBroker";
27
28 if (!ConfigurationManager.AppSettings.AllKeys.Contains(KafkaBrokerKeyName))
29 {
30 KafkaBroker = "http://localhost:9092";
31 }
32 else
33 {
34 KafkaBroker = ConfigurationManager.AppSettings[KafkaBrokerKeyName];
35 }
36 return KafkaBroker;
37 }
38
39 private static string getTopicName()
40 {
41 string TopicName = string.Empty;
42 var TopicNameKeyName = "Topic";
43
44 if (!ConfigurationManager.AppSettings.AllKeys.Contains(TopicNameKeyName))
45 {
46 throw new Exception("Key \"" + TopicNameKeyName + "\" not found in Config file -> configuration/AppSettings");
47 }
48 else
49 {
50 TopicName = ConfigurationManager.AppSettings[TopicNameKeyName];
51 }
52 return TopicName;
53 }
54 }