【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示

简介: MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。


前言: MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。

 

开发环境:

VS2022 + .NET 6 + Webapi / 控制台

 

1、新建一个webapi项目,用来后面做测试使用


1995789-20220703194134355-483218839.png


2、新建一个继承自IHostedService的服务,用于随着webapi程序的启动而自动执行。(最终代码在文末)


1995789-20220703194205010-1299395058.png


3、引入 MQTTNet 包,该项目提供了.net环境下的MQTT通信协议支持,这款框架很优秀,此处直接引用它来进行使用。


1995789-20220703194345070-1810279689.png


4、在上面的MqttHostService类里面,开始方法里面新增初始化MQTT服务端的一些功能,例如 IP、端口号、事件等等。


1995789-20220703194636641-1567439202.png


5、mqtt服务端支持的一系列功能很多,大佬们可以自行去尝试一些新发现,此处只使用若干个简单功能。


1995789-20220703194830275-502253778.png


6、添加客户端连接事件、连接关闭事件


1995789-20220703194928106-1460692685.png

 

7、由于事件要用的可能有点多,此处就不一一例举了,可以直接看以下的代码,以及有关注释来理解。


1995789-20220703195009822-97634286.png


8、事件触发时候,打印输出


1995789-20220703195107565-214206639.png

 

9、输出之前,记录一个当前事件名称标记一下,用于可以更加清楚看出是哪个事件输出的。


1995789-20220703215235523-678532482.png

 

10、对MqttHostService类进行注册,用于程序启动时候跟随启动。


1995789-20220703215345229-1564643322.png

 

11、上面貌似设计的不是很友好,所以把mqtt服务实例单独弄出来,写入到单独的类里面做成属性,供方便调用。

 1995789-20220703215441638-987500302.png


12、把先前的一些东西改一下,换成使用上面步骤的属性来直接调用使用。


1995789-20220703215547355-2106860901.png


13、运行一下,看看是否可以成功,显示服务已启动,说明服务启动时OK的了.


1995789-20220703215702808-1751344784.png

 

14、新增一个控制台程序 MqttClient,用于模拟客户端。


1995789-20220703215756942-1232256291.png


15、创建客户端启动以及有关配置信息和有关事件,如图。具体使用可以看代码注释,就不过多解释了。

 1995789-20220703215840102-114291994.png


16、在program类里面,调用客户端启动方法,用于测试使用。


1995789-20220703215941108-1286692271.png


17、上面客户端对应的三个事件的实现如图,同时进行有关信息的打印输出。


1995789-20220703220050242-1487085793.png

 

18、启动服务端,然后启动客户端,可以看到服务端有一个连接失败的消息,这个是因为上面配置的客户端用户名是admin,密码是1234567,而服务端配置的规则是,用户名是admin  密码是123456

1995789-20220703220148788-635156660.png

 

19、密码改回正常匹配项以后,再重新运行试试看,可以看到客户端与服务端连接上了。


1995789-20220703220343280-644593861.png

 

20、如果关闭客户端,也可以看到服务端会进入客户端关闭事件内。

 1995789-20220703220439491-842049005.png


21、把上面主题订阅的内容写到连接成功以后的事件里面,不然客户端连接期间,可能就执行了主题订阅,会存在订阅失败的情况。改为写入到连接成功以后的事件里面,可以保证主题订阅肯定是在客户端连接成功以后才执行的。


1995789-20220703220523956-1567473251.png

 

22、接下来测试服务端消息推送,在MqttService服务里面,新增一个方法,用来执行mqtt服务端发布主题消息使用。有关配置信息和消息格式,如图所示。


1995789-20220703220719847-1172207829.png

 

23、新增一个API控制器,用来测试使用。API参数直接拿来进行消息的推送使用。


1995789-20220703220840811-199730676.png


24、运行服务端和客户端,并访问刚刚新增的api接口,手动随意输入一条消息,可以看到客户端订阅的主题消息已经被实时接收到了。


1995789-20220703220958165-278871284.png

 

25、接下来对客户端新增一个消息推送的方法,用来测试客户端消息发布的功能。有关消息格式和调用,如图所示,以及注释部分的说明。


1995789-20220703221108865-807389405.png

 

26、客户端program类里面,客户端连接以后,通过手动回车,来执行客户端发布消息。


1995789-20220703221255436-685914373.png


27、再次启动服务端和客户端


1995789-20220703221349692-703611602.png


28、然后客户端内按一下回车,执行消息发布功能。可以看到,服务端成功接收到了客户端发过来的主题消息。


1995789-20220703221428986-1395486033.png


29、接下来测试客户端与客户端之间的消息发布与订阅,为了模拟多客户端效果,把上面客户端已经编译好的文件拷贝一份出来。


1995789-20220703221534107-1276617126.png


30、然后本地的代码进行一些修改,用来当做第二个客户端程序。所以客户端id也进行变更为 testclient02


1995789-20220703221658862-1953759631.png


31、对客户端订阅的主题,也改成 topic_02


1995789-20220703221807134-1328871422.png


32、启动服务端,以及拷贝出来的客户端1,和上面修改了部分代码的客户端2,保证都已经连接上服务端。


1995789-20220703221837670-178570046.png


33、调用服务端的api接口,由于服务端发布的消息是发布给topic_01的,所以只有客户端1可以接收到消息。


1995789-20220703221930141-1503226518.png

 

34、客户端1执行回车,用于发布一段消息给主题 topic_02,可以看到客户端01发布的消息,同时被服务端和客户端02接收到了。因为服务端是总指挥,所以客户端发布的消息都会经过服务端,从而服务端都可以接收到连接的客户端发布的所有消息。


1995789-20220703222118765-1358166854.png

 

35、测试数据保持,下面先对客户端1进行断开,然后再重新连接客户端1,可以看到客户端1直接接收到了它订阅的主题的上一次最新的消息内容,这个就是消息里面,Retain属性设为True的结果,用于让服务端记忆该主题消息使用的。如果设为false,就没有这个效果了,大佬们也可以自己尝试。


1995789-20220703222414564-2056736783.png


36、最终的服务端代码:

MqttHostService:


public class MqttHostService : IHostedService, IDisposable
    {
        public void Dispose()
        {
        }
        const string ServerClientId = "SERVER";
        public Task StartAsync(CancellationToken cancellationToken)
        {
            MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();
            optionsBuilder.WithDefaultEndpoint();
            optionsBuilder.WithDefaultEndpointPort(10086); // 设置 服务端 端口号
            optionsBuilder.WithConnectionBacklog(1000); // 最大连接数
            MqttServerOptions options = optionsBuilder.Build();
            MqttService._mqttServer = new MqttFactory().CreateMqttServer(options);
            MqttService._mqttServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //客户端连接事件
            MqttService._mqttServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 客户端关闭事件
            MqttService._mqttServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件
            MqttService._mqttServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 客户端订阅主题事件
            MqttService._mqttServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 客户端取消订阅事件
            MqttService._mqttServer.StartedAsync += _mqttServer_StartedAsync; // 启动后事件
            MqttService._mqttServer.StoppedAsync += _mqttServer_StoppedAsync; // 关闭后事件
            MqttService._mqttServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件
            MqttService._mqttServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 用户名和密码验证有关
            MqttService._mqttServer.StartAsync();
            return Task.CompletedTask;
        }
        /// <summary>
        /// 客户端订阅主题事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientSubscribedTopicAsync:客户端ID=【{arg.ClientId}】订阅的主题=【{arg.TopicFilter}】 ");
            return Task.CompletedTask;
        }
        /// <summary>
        /// 关闭后事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StoppedAsync(EventArgs arg)
        {
            Console.WriteLine($"StoppedAsync:MQTT服务已关闭……");
            return Task.CompletedTask;
        }
        /// <summary>
        /// 用户名和密码验证有关
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
        {
            arg.ReasonCode = MqttConnectReasonCode.Success;
            if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456")
            {
                arg.ReasonCode = MqttConnectReasonCode.Banned;
                Console.WriteLine($"ValidatingConnectionAsync:客户端ID=【{arg.ClientId}】用户名或密码验证错误 ");
            }
            return Task.CompletedTask;
        }
        /// <summary>
        /// 消息接收事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            if (string.Equals(arg.ClientId, ServerClientId))
            {
                return Task.CompletedTask;
            }
            Console.WriteLine($"InterceptingPublishAsync:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;
        }
        /// <summary>
        /// 启动后事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StartedAsync(EventArgs arg)
        {
            Console.WriteLine($"StartedAsync:MQTT服务已启动……");
           return Task.CompletedTask;
        }
        /// <summary>
        /// 客户端取消订阅事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientUnsubscribedTopicAsync:客户端ID=【{arg.ClientId}】已取消订阅的主题=【{arg.TopicFilter}】  ");
            return Task.CompletedTask;
        }
        private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
        {
            Console.WriteLine($"ApplicationMessageNotConsumedAsync:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;
        }
        /// <summary>
        /// 客户端断开时候触发
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
        {
            Console.WriteLine($"ClientDisconnectedAsync:客户端ID=【{arg.ClientId}】已断开, 地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;
        }
        /// <summary>
        /// 客户端连接时候触发
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
        {
            Console.WriteLine($"ClientConnectedAsync:客户端ID=【{arg.ClientId}】已连接, 用户名=【{arg.UserName}】地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
           return Task.CompletedTask;
        }
    }

 

MqttService:


 public class MqttService
    {
        public static MqttServer _mqttServer { get; set; }
        public static void PublishData(string data)
        {
            var message = new MqttApplicationMessage
            {
                Topic = "topic_01",
                Payload = Encoding.Default.GetBytes(data),
                QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
                Retain = true  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
            };
            _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 发送消息给有订阅 topic_01的客户端
            {
                SenderClientId = "Server_01"
            }).GetAwaiter().GetResult();
        }
    }


37、最终的客户端代码:


MqttClientService:


public class MqttClientService
    {
        public static IMqttClient _mqttClient;
        public void MqttClientStart()
        {
            var optionsBuilder = new MqttClientOptionsBuilder()
                .WithTcpServer("127.0.0.1", 10086) // 要访问的mqtt服务端的 ip 和 端口号
                .WithCredentials("admin", "123456") // 要访问的mqtt服务端的用户名和密码
                .WithClientId("testclient02") // 设置客户端id
                .WithCleanSession()
                .WithTls(new MqttClientOptionsBuilderTlsParameters
                {
                    UseTls = false  // 是否使用 tls加密
                });
            var clientOptions = optionsBuilder.Build();
            _mqttClient = new MqttFactory().CreateMqttClient();
            _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件
            _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件
            _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件
            _mqttClient.ConnectAsync(clientOptions);
        }
        /// <summary>
        /// 客户端连接关闭事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            Console.WriteLine($"客户端已断开与服务端的连接……");
            return Task.CompletedTask;
        }
        /// <summary>
        /// 客户端连接成功事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            Console.WriteLine($"客户端已连接服务端……");
            // 订阅消息主题
            // MqttQualityOfServiceLevel: (QoS):  0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。
            // 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。
            // 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。
            _mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce);
            return Task.CompletedTask;
        }
        /// <summary>
        /// 收到消息事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;
        }
        public void Publish(string data)
        {
            var message = new MqttApplicationMessage
            {
                Topic = "topic_02",
                Payload = Encoding.Default.GetBytes(data),
                QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
                Retain = true  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
            };
            _mqttClient.PublishAsync(message);
        }
    }

 

38、后记:MQTT以上演示已经完毕,可以看到它的一些特性,跟websocket很接近,但是又比websocket通信更加灵活。其实,实际上MQTT的客户端在现实生产环境场景下,并不需要咱们开发者进行开发,很多硬件设备都支持提供MQTT协议的通信客户端,所以只需要自己搭建一个服务端,就可以实现实时监控各种设备推送过来的各种信号数据。同时客户端支持发布消息给其他客户端,所以就实现了设备与设备之间的一对一信号通信的效果了。如果需要下发信号给硬件设备,MQTT服务端也可以直接下发给某个指定设备来进行实现即可。上面案例只提供入门方案,如果有感兴趣的大佬,可以自己去拓展一下,来达到更好的效果。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
30天前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
20天前
|
安全 算法 编译器
.NET 9 AOT的突破 - 支持老旧Win7与XP环境
【10月更文挑战第30天】在.NET 9 中,AOT(Ahead-of-Time)编译技术在支持老旧的 Windows 7 和 XP 系统方面取得了显著进展。主要突破包括:性能提升(启动速度加快、执行效率提高)、部署优化(无需安装.NET 运行时、减小应用程序体积)、兼容性保障(编译策略优化、依赖项管理改进)以及安全性增强(代码保护机制)。这些改进使得应用程序在老旧系统上运行更加流畅、高效和安全。
|
20天前
|
XML 安全 API
.NET 9 AOT的突破 - 支持老旧Win7与XP环境
.NET 9开始,AOT支持Win7和XP,不仅仅只支持SP1版本
.NET 9 AOT的突破 - 支持老旧Win7与XP环境
|
25天前
|
安全 Java 网络安全
Android远程连接和登录FTPS服务代码(commons.net库)
Android远程连接和登录FTPS服务代码(commons.net库)
22 1
|
1月前
|
网络协议 Unix Linux
一个.NET开源、快速、低延迟的异步套接字服务器和客户端库
一个.NET开源、快速、低延迟的异步套接字服务器和客户端库
|
1月前
|
存储 消息中间件 NoSQL
Redis 入门 - C#.NET Core客户端库六种选择
Redis 入门 - C#.NET Core客户端库六种选择
59 8
|
1月前
|
前端开发 JavaScript C#
CodeMaid:一款基于.NET开发的Visual Studio代码简化和整理实用插件
CodeMaid:一款基于.NET开发的Visual Studio代码简化和整理实用插件
|
2月前
|
消息中间件 Kafka 数据安全/隐私保护
RabbitMQ异步通信详解
RabbitMQ异步通信详解
90 16
|
3月前
|
Kubernetes 监控 Devops
【独家揭秘】.NET项目中的DevOps实践:从代码提交到生产部署,你不知道的那些事!
【8月更文挑战第28天】.NET 项目中的 DevOps 实践贯穿代码提交到生产部署全流程,涵盖健壮的源代码管理、GitFlow 工作流、持续集成与部署、容器化及监控日志记录。通过 Git、CI/CD 工具、Kubernetes 及日志框架的最佳实践应用,显著提升软件开发效率与质量。本文通过具体示例,助力开发者构建高效可靠的 DevOps 流程,确保项目成功交付。
75 0
|
3月前
|
XML 开发框架 .NET
.NET框架:软件开发领域的瑞士军刀,如何让初学者变身代码艺术家——从基础架构到独特优势,一篇不可错过的深度解读。
【8月更文挑战第28天】.NET框架是由微软推出的统一开发平台,支持多种编程语言,简化应用程序的开发与部署。其核心组件包括公共语言运行库(CLR)和类库(FCL)。CLR负责内存管理、线程管理和异常处理等任务,确保代码稳定运行;FCL则提供了丰富的类和接口,涵盖网络、数据访问、安全性等多个领域,提高开发效率。此外,.NET框架还支持跨语言互操作,允许开发者使用C#、VB.NET等语言编写代码并无缝集成。这一框架凭借其强大的功能和广泛的社区支持,已成为软件开发领域的重要工具,适合初学者深入学习以奠定职业生涯基础。
102 1