【Kafka】基于Windows环境的Kafka有关环境(scala+zookeeper+kafka+可视化工具)搭建、以及使用.NET环境开发的案例代码与演示

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 基于Windows系统下的Kafka环境搭建;以及使用.NET 6环境进行开发简单的生产者与消费者的演示。


前言:基于Windows系统下的Kafka环境搭建;以及使用.NET 6环境进行开发简单的生产者与消费者的演示。

 

一、环境部署


Kafka是使用Java语言和Scala语言开发的,所以需要有对应的Java环境,以及Scala语言环境。

Java环境配置,如果不清楚的,可以查看鄙人的另一篇博客:

https://www.cnblogs.com/weskynet/p/14852471.html

 

1、Scala环境安装,需要先下载Scala语言包,下载地址:

https://www.scala-lang.org/download/scala2.html

 

要选择Binaries版本的环境,否则需要自己编译:

1995789-20220121233845174-1968751283.png

 

2、Kafka基于Zookeeper环境运行,zookeeper提供给kafka一系列的功能支持,所以还需要安装Zookeeper有关的环境。下载zookeeper地址:

https://zookeeper.apache.org/releases.html#download

 

1995789-20220121234031423-1498852000.png

 

3、同样,Zookeeper也需要下载带bin 的链接,没有带bin的链接,可能是源码,需要自己编译:

 1995789-20220121234042119-1609696911.png

 

4、接下来是下载主角,Kafka了。下载地址:

https://kafka.apache.org/downloads.html

1995789-20220121234050129-939922519.png

 

5、同样需要选择下载binary版本,然后根据scala的版本选择对应的版本。

 

1995789-20220121234058837-471215852.png

 

6、下载的三个安装包,如图所示:


1995789-20220121234108295-591191618.png

 

7、先安装Scala语言包环境:

 1995789-20220121234115856-679419280.png

 

8、验证Scala语言包是否安装成功:

控制台窗口,输入:scala -version

如果提示类似如下有关版本信息,则代表安装成功。

1995789-20220121234124904-1502173649.png

 

9、然后是安装zookeeper环境。必须先启动zookeeper,才可以使用kafka。

安装zookeeper环境,先解压下载的包,然后在解压后的目录下新增data文件夹

1995789-20220121234133947-1236090861.png

 

10、然后复制data文件夹的绝对路径,备用。在conf文件夹下,编辑cfg文件

 1995789-20220121234142606-1476155472.png

 

11、在cfg文件内,修改dataDir指定为上面新建的data文件夹的绝对路径。注意路径是斜杠/,如果要使用 \ 反斜杆,需要写双反斜杠 \\

 1995789-20220121234149392-25967670.png

 

12、也要更改cfg格式的文件名称为 zoo.cfg 否则zookeeper无法识别配置文件。Zoo.cfg文件是zookeeper启动时候自动关联的默认配置文件名称。

1995789-20220121234157894-1612505954.png

 

13、然后新建环境变量 ZOOKEEPER_HOME:

1995789-20220121234214199-1906494318.png

 

14、环境变量path新增:%ZOOKEEPER_HOME%\bin


 1995789-20220121234222570-71977285.png

 

15、启动zookeeper,直接任意打开控制台,输入 zkServer

 1995789-20220121234233810-194473166.png

 

16、如果都没有报错,一般是启动成功了的。再次验证下,可以任意开个控制台,输入JPS进行查看,如下图所示,有JPS、也有QuorumPeerMain,代表zookeeper启动成功了。

1995789-20220121234244186-1681773635.png

 

17、Kafka环境安装。先解压,然后在解压后的目录下,新增logs文件夹

 1995789-20220121234254482-132794321.png

 

18、然后在Config文件夹下,修改 server.properties 文件,修改 log.dirs 的值为 新增的logs文件夹的绝对路径

1995789-20220121234304194-1008877938.png

 

19、进入到解压后的kafka目录下,在路径栏输入cmd,快速打开当前文件夹下的控制台窗口:

 1995789-20220121234313534-1713656591.png

 

20、输入命令:

.\bin\windows\kafka-server-start.bat .\config\server.properties

进行启动Kafka服务:

 1995789-20220121234323158-226262415.png

 

21、启动Kafka报错了,可能是版本问题,kafka一般新版本对windows环境不友好,所以降级一下。此处我把kafka3.0降级为2.8:

 1995789-20220121234334087-865269622.png

 

22、此处我下载的版本为 2.13-2.8.1,各位大佬们可以按照自己意愿选择版本。可能2.x版本和3.x版本跨度比较大,所以3.0版本没法玩。

1995789-20220121234342701-391875125.png

 

23、然后是重复以上配置kafka有关的动作,修改有关配置文件以及新增logs文件夹等。此处省略。

1995789-20220121234350125-800296702.png

 

24、接着在低版本的kafka目录下,快速进入当前解压缩的目录下,再次输入有关命令尝试一下:

 1995789-20220121234358770-362202472.png

 

25、没有提示错误,根据提示信息,代表是启动成功了。任意打开控制台,再输入JPS查看下,可以看到Kafka,确认是启动OK了。

 1995789-20220121234408479-1402417354.png

 

26、然后是要一款Kafka可视化工具,此处我选择使用offset explorer  (原来是叫kafka tools,如下载地址所示),下载地址:

https://www.kafkatool.com/download.html

 1995789-20220121234420501-1879684648.png

 

27、安装可视化工具,默认可以一直下一步:


 1995789-20220121234431434-637387667.png

 

28、可以在安装目录下把可执行程序发送到桌面快捷方式,方便打开。

 1995789-20220121234440190-171222634.png

 

29、一些配置,包括名称、kafka版本、端口号、服务地址等

1995789-20220121234450103-1848054763.png

 

30、连接以后的效果图,如下。Topic是空的,接下来写点代码。

1995789-20220121234459775-449142215.png

 

二、代码开发与测试


31、新建类库项目,当作kafka服务类库

1995789-20220121234509867-206063450.png

 

32、此处选择标准库2.1,用于可以给多种.net core版本使用,方便兼容。

 1995789-20220121234517490-311358850.png

  

33、引用 Confluent.Kafka 包。

 1995789-20220121234528358-2000910560.png

 

34、此处新增发布服务类和订阅服务类:

1995789-20220121234539075-1799001904.png

 

35、新增的生产者发布服务方法代码如下:

1995789-20220121234548599-1496443564.png


代码:


/// <summary>
    /// Description: Kafka生产者发布服务
    /// CreateTime: 2022/1/21 19:35:27
    /// Author: Wesky
    /// </summary>
    public class PublishService: IPublishService
    {
        public async Task PublishAsync<TMessage>(string broker, string topicName, TMessage message) where TMessage : class
        {
            var config = new ProducerConfig
            {
                BootstrapServers = broker, // kafka服务集群,例如  "192.168.0.1:9092,192.168.0.2:9092"   或者单机 "192.168.0.1:9092"
                Acks = Acks.All,
                MessageSendMaxRetries = 3, // 发送失败重试的次数
            };
            using (var producer = new ProducerBuilder<string, string>(config).Build())
            {
                try
                {
                    string data = Newtonsoft.Json.JsonConvert.SerializeObject(message);
                    var sendData = new Message<string, string> { Key = Guid.NewGuid().ToString("N"), Value = data};
                    var report =  await producer.ProduceAsync(topicName, sendData);
                    Console.WriteLine($"消息 >>>>>: {data} \r\n发送到:{report.TopicPartitionOffset}");
                }
                catch (ProduceException<string, string> ex)
                {
                    Console.WriteLine($"消息发送失败>>>>>:\r\n Code= {ex.Error.Code} >>> \r\nError= {ex.Message}");
                }
            }
        }
    }


36、新增的消费者接收服务方法代码如下:

1995789-20220121234557211-8566331.png

 

代码:


/// <summary>
    /// Description: kafka 消费者订阅服务
    /// CreateTime: 2022/1/21 19:36:25
    /// Author: Wesky
    /// </summary>
    public class SubscribeService: ISubscribeService
    {
        /// <summary>
        /// 消费者服务核心代码
        /// </summary>
        /// <typeparam name="TMessage"></typeparam>
        /// <param name="config">消费者配置信息</param>
        /// <param name="topics">主题集合</param>
        /// <param name="func"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task SubscribeAsync<TMessage>(ConsumerConfig config, IEnumerable<string> topics,  Action<TMessage> func, CancellationToken cancellationToken) where TMessage : class
        {
            const int commitPeriod = 1;
            using (var consumer = new ConsumerBuilder<Ignore, string>(config)
             .SetErrorHandler((_, e) =>
             {
                 Console.WriteLine($"消费错误 >>>>>: {e.Reason}");
             })
             .SetStatisticsHandler((_, json) =>
             {
                 Console.WriteLine($"************************************************");
             })
             .SetPartitionsAssignedHandler((c, partitionList) =>
             {
                 string partitions = string.Join(", ", partitionList);
                 Console.WriteLine($"分配的分区 >>>>> : {partitions}");
             })
             .SetPartitionsRevokedHandler((c, partitionList) =>
             {
                 string partitions = string.Join(", ", partitionList);
                 Console.WriteLine($"回收的分区 >>>>> : {partitions}");
             })
             .Build())
            {
                consumer.Subscribe(topics);
                try
                {
                    while (true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cancellationToken);
                            if (consumeResult.IsPartitionEOF)
                            {
                                continue;
                            }
                            if(consumeResult?.Offset % commitPeriod == 0){
                                try
                                {
                                    var result = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message?.Value);
                                    func(result); // 消费消息
                                }
                                catch (Exception ex)
                                {
                                    Console.WriteLine($"消费业务处理失败: {ex.Message}");
                                }
                                try
                                {
                                    consumer.Commit(consumeResult);  // 手动提交
                                    Console.WriteLine($"消费者消费完成,已提交 ");
                                }
                                catch (KafkaException e)
                                {
                                    Console.WriteLine($"提交错误 >>>>> : {e.Error.Reason}");
                                }
                            }
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"消费错误>>>>> : {e.Error.Reason}");
                        }
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine($"其他错误 >>>>> :{e.Message}");
                    consumer.Close();
                }
            }
            await Task.CompletedTask;
        }
    }

37、并且提供对应的接口服务,用于开放给外部调用,或者提供依赖注入使用:

 1995789-20220121234804184-557736248.png

 

38、新建一个控制台项目,用来当作消费者端的测试,并且新增一个方法,用来当作消费者接收到消息以后的业务处理方法体。此处控制台环境版本为.NET 6

 1995789-20220121234823876-789291207.png

 

39、消费客户端代码如下。其中,BootstrapServers也可以提供集群地址,例如 ip1:port,ip2:port…… 服务之间以半角逗号隔开。

1995789-20220121234853576-80216115.png

 

40、再新增一个webapi项目,用来当作生产者的客户端进行发送数据。以及对kafka服务类部分进行依赖注入注册,此处使用单例。该webapi此处使用.NET 6环境,带有控制器的模式。

 1995789-20220121234910776-1748652765.png

 

41、新增的控制器里面,进行生产者的注入与实现。注意:topicName参数对应上边的topic-wesky,通过主题绑定,否则消费者不认识就没办法消费到了。

1995789-20220121234923901-195310594.png


控制器代码:


[Route("api/[controller]/[action]")]
    [ApiController]
    public class ProducerController : ControllerBase
    {
        IPublishService _service = null;
        public ProducerController(IPublishService publishService)
        {
            _service = publishService;
        }
        [HttpPost]
        public IActionResult SendMessage(string broker,string topicName,string message)
        {
            _service.PublishAsync(broker, topicName, message);
            return Ok();
        }
    }


42、接下来是一些测试,如图所示:

1995789-20220121235056596-2037531808.png

 

43、最后,使用可视化管理工具Offset进行查看,可以看到对应的主题。选中主题,可以设置数据类型,这里我设置为字符串,就可以查看到对应的消息内容了。如果没有设置,默认是16进制的数据。

1995789-20220121235112885-1315866318.png

 

44、查看刚刚测试时候收发的消息队列里面的数据,如下所示:

1995789-20220121235120770-836684877.png

 

45、一些额外补充:

Kafka也是消息队列的一种,用于在高吞吐量场景下使用比较适合。如果是轻量级的,只需要用于削峰,可以使用RabbitMQ。

以上只是简单的操作演示,至于要用得溜,观众朋友们可以自行补充所需的相关理论知识。

可视化工具还有一款yahoo提供的开源的工具,叫kafka-manager,有兴趣的大佬们可以自行玩玩,开源地址:

https://github.com/yahoo/CMAK

 

还有一款滴滴平台做的开源的kafka运维管理平台,有兴趣的大佬们也可以自行了解,地址:

https://github.com/didi/LogiKM

 

 

以上就是该博客的全部内容,感谢各位大佬们的观看~


目录
相关文章
|
5天前
|
缓存 数据库 索引
everything 本地文件搜索工具 完胜WIndows搜索 速度99% 超级给力
everything 本地文件搜索工具 完胜WIndows搜索 速度99% 超级给力
|
9天前
|
监控 安全 虚拟化
深入浅出:NSSM封装Windows服务工具的使用与介绍
深入浅出:NSSM封装Windows服务工具的使用与介绍
21 3
|
12天前
|
人工智能 量子技术 C#
【专栏】.NET 开发:开启数字化新时代
【4月更文挑战第29天】.NET开发在数字化新时代中发挥关键作用,借助跨平台能力、高性能和现代编程语言支持,如C#,助力企业实现数字化转型。通过企业级应用开发、移动应用和云计算集成,.NET加速业务流程和提升用户体验。未来,.NET将涉足AI、ML、MR/AR及量子计算,持续推动技术创新和数字化转型。开发者应提升技能,适应高性能需求,把握发展机遇。
|
12天前
|
缓存 监控 算法
【专栏】.NET 开发:实现卓越性能的途径
【4月更文挑战第29天】本文探讨了.NET开发中的性能优化,强调了理解性能问题根源和使用分析工具的重要性。基础优化包括代码优化(如减少计算、避免内存泄漏)、资源管理及选择合适算法。高级策略涉及并行编程、缓存策略、预编译(AOT)和微服务架构。持续性能测试与监控是关键,包括性能测试、监控分析和建立优化反馈循环。开发者应持续学习和实践性能优化,以构建高性能应用。
|
12天前
|
开发框架 .NET C#
【专栏】理解.NET 技术,提升开发水平
【4月更文挑战第29天】本文介绍了.NET技术的核心概念和应用,包括其跨平台能力、性能优化、现代编程语言支持及Web开发等特性。文章强调了深入学习.NET技术、关注社区动态、实践经验及学习现代编程理念对提升开发水平的重要性。通过这些,开发者能更好地利用.NET构建高效、可维护的多平台应用。
|
12天前
|
机器学习/深度学习 vr&ar 开发者
【专栏】.NET 技术:引领开发新方向
【4月更文挑战第29天】本文探讨了.NET技术如何引领软件开发新方向,主要体现在三方面:1) 作为跨平台开发的先锋,.NET Core支持多操作系统和移动设备,借助.NET MAUI创建统一UI,适应物联网需求;2) 提升性能和开发者生产力,采用先进技术和优化策略,同时更新C#语言特性,提高代码效率和可维护性;3) 支持现代化应用架构,包括微服务、容器化,集成Kubernetes和ASP.NET Core,保障安全性。此外,.NET还不断探索AI、ML和AR/VR技术,为软件开发带来更多创新可能。
|
12天前
|
物联网 vr&ar 开发者
【专栏】.NET 技术:为开发注入活力
【4月更文挑战第29天】本文探讨了.NET技术的创新,主要体现在三个方面:1) .NET Core实现跨平台开发革命,支持多种操作系统和硬件,如.NET MAUI用于多平台UI;2) 性能提升与生产力飞跃,C#新特性简化编程,JIT和AOT优化提升性能,Roslyn提供代码分析工具;3) 引领现代化应用架构,支持微服务、容器化,内置安全机制。未来,.NET 7将带来更多新特性和前沿技术整合,如量子计算、AI,持续推动软件开发创新。开发者掌握.NET技术将赢得竞争优势。
|
12天前
|
人工智能 前端开发 Cloud Native
【专栏】洞察.NET 技术的开发趋势
【4月更文挑战第29天】本文探讨了.NET技术的三大发展趋势:1) 跨平台与云原生技术融合,通过.NET Core支持轻量级、高性能应用,适应云计算和微服务;2) 人工智能与机器学习的集成,如ML.NET框架,使开发者能用C#构建AI模型;3) 引入现代化前端开发技术,如Blazor,实现前后端一致性。随着.NET 8等新版本的发布,期待更多创新技术如量子计算、AR/VR的融合,.NET将持续推动软件开发的创新与进步。
|
3月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
484 2
2024年了,如何更好的搭建Kafka集群?
|
4月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
49 0