使用.net客户端连接到Kafka
这里推荐使用一个开源.net客户端:https://github.com/Jroland/kafka-net
Producer
1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092")); 2 var router = new BrokerRouter(options); 3 var client = new Producer(router); 4 5 client.SendMessageAsync("TestHarness", new[] { new Message("hello world")}).Wait(); 6 7 using (client) { }
Consumer
1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092")); 2 var router = new BrokerRouter(options); 3 var consumer = new Consumer(new ConsumerOptions("TestHarness", router)); 4 5 //Consume returns a blocking IEnumerable (ie: never ending stream) 6 foreach (var message in consumer.Consume()) 7 { 8 Console.WriteLine("Response: P{0},O{1} : {2}", 9 message.Meta.PartitionId, message.Meta.Offset, message.Value); 10 }
完整测试代码
Producer生产者:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 do 6 { 7 Produce(GetKafkaBroker(), getTopicName()); 8 System.Threading.Thread.Sleep(3000); 9 } while (true); 10 } 11 12 private static void Produce(string broker, string topic) 13 { 14 var options = new KafkaOptions(new Uri(broker)); 15 var router = new BrokerRouter(options); 16 var client = new Producer(router); 17 18 var currentDatetime =DateTime.Now; 19 var key = currentDatetime.Second.ToString(); 20 var events = new[] { new Message("Hello World " + currentDatetime, key) }; 21 client.SendMessageAsync(topic, events).Wait(1500); 22 Console.WriteLine("Produced: Key: {0}. Message: {1}", key, events[0].Value.ToUtf8String()); 23 24 using (client) { } 25 } 26 27 private static string GetKafkaBroker() 28 { 29 string KafkaBroker = string.Empty; 30 const string kafkaBrokerKeyName = "KafkaBroker"; 31 32 if (!ConfigurationManager.AppSettings.AllKeys.Contains(kafkaBrokerKeyName)) 33 { 34 KafkaBroker = "http://localhost:9092"; 35 } 36 else 37 { 38 KafkaBroker = ConfigurationManager.AppSettings[kafkaBrokerKeyName]; 39 } 40 return KafkaBroker; 41 } 42 private static string getTopicName() 43 { 44 string TopicName = string.Empty; 45 const string topicNameKeyName = "Topic"; 46 47 if (!ConfigurationManager.AppSettings.AllKeys.Contains(topicNameKeyName)) 48 { 49 throw new Exception("Key \"" + topicNameKeyName + "\" not found in Config file -> configuration/AppSettings"); 50 } 51 else 52 { 53 TopicName = ConfigurationManager.AppSettings[topicNameKeyName]; 54 } 55 return TopicName; 56 } 57 }
Consumer消费者:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 Consume(getKafkaBroker(), getTopicName()); 6 7 } 8 9 private static void Consume(string broker, string topic) 10 { 11 var options = new KafkaOptions(new Uri(broker)); 12 var router = new BrokerRouter(options); 13 var consumer = new Consumer(new ConsumerOptions(topic, router)); 14 15 //Consume returns a blocking IEnumerable (ie: never ending stream) 16 foreach (var message in consumer.Consume()) 17 { 18 Console.WriteLine("Response: Partition {0},Offset {1} : {2}", 19 message.Meta.PartitionId, message.Meta.Offset, message.Value.ToUtf8String()); 20 } 21 } 22 23 private static string getKafkaBroker() 24 { 25 string KafkaBroker = string.Empty; 26 var KafkaBrokerKeyName = "KafkaBroker"; 27 28 if (!ConfigurationManager.AppSettings.AllKeys.Contains(KafkaBrokerKeyName)) 29 { 30 KafkaBroker = "http://localhost:9092"; 31 } 32 else 33 { 34 KafkaBroker = ConfigurationManager.AppSettings[KafkaBrokerKeyName]; 35 } 36 return KafkaBroker; 37 } 38 39 private static string getTopicName() 40 { 41 string TopicName = string.Empty; 42 var TopicNameKeyName = "Topic"; 43 44 if (!ConfigurationManager.AppSettings.AllKeys.Contains(TopicNameKeyName)) 45 { 46 throw new Exception("Key \"" + TopicNameKeyName + "\" not found in Config file -> configuration/AppSettings"); 47 } 48 else 49 { 50 TopicName = ConfigurationManager.AppSettings[TopicNameKeyName]; 51 } 52 return TopicName; 53 } 54 }
消息队列一
消息队列二
消息队列三
消息队列四
消息队列五
消息队列六
消息队列七