基于EMQ2.0的学习和使用简单分享

简介: EMQ介绍   EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。

EMQ介绍

  EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网(IoT)消息协议。
 
订阅(pub)/发布(sub)模式
 
消息队列中的广播(fanout)模式
 
 
轻量化:docker镜像都才88.4MB
 
 

一些关于常用EMQTT的快速链接:

官网API地址: http://www.emqtt.com/docs/v2/
开源项目地址: https://github.com/emqtt
MQTT介绍和场景: https://www.mqtt.com/
 

安装过程

假如你的centos上已经安装了Docker,并pull了devicexx/emqttd这个镜像,输入如下命令
docker run -dit --name=sample_emqtt --restart=always -p 18083:18083 -p 1883:1883 -p 8083:8083 -p 8883:8883 998429a869e8

 

 
确保映射如下几个端口
1883、8083、8883:这三个是基于EMQTT传输通讯的端口
18083:这个是EMQTT Web控制台的端口
 
我的虚拟机IP网段是153.132,直接输入http://192.168.153.132:18083进入Web控制台,默认用户名和密码是admin, public
至此EMQTT已经顺利安装完成。
 

在NET Core中使用EMQ:

emqtt官网没有提供.net的客户端,在nuget和github上找到了实现了MQTT协议的公共组件。
 
 

MQTTnet Client的几个主要事件:

ApplicationMessageReceived:MQTTnet中主要方法事件,当接收到消息的时候,该事件触发。包含Topic,Payload,Qos,Retain主要成员。
Connected:客户端成功连接时触发。
Disconnected:客户端丢失连接时触发。
 

创建一个发布者

先创建一个发布者Publisher的配置类,代码简单:
    public static class PublisherConfig
    {
        /// <summary>
        /// 创建mqtt的客户端接口实例
        /// </summary>
        public static readonly IMqttClient Client = new MqttFactory().CreateMqttClient();

        /// <summary>
        /// 创建mqtt配置选项
        /// </summary>
        public static readonly MqttClientOptions ClientOptions = new MqttClientOptions
        {
            // 通道选项
            ChannelOptions = new MqttClientTcpOptions
            {
                Server = "192.168.153.132"
            },
            
            /*
             * 客户端ID
             * 在MQTTnet框架中,当ClientId未赋值,将使用默认的GUID生成默认的ClientId
             */
            ClientId = "Client_publisher",
            
            // 客户端认证
            Credentials = new MqttClientCredentials
            {
                Username = "clientUser_01",
                Password = "123123"
            }
        };

        public const string Topic = MQTT_Common.Config.Topic.Name;
    }

 

 为了测试方便,创建了一个全局固定的Topic名称

    public static class Topic
    {
        public const string Name = "/WorldTopic";
    }

 

再创建一个发布者Publisher,代码简单

    public static class Publisher
    {
        private static readonly IMqttClient Client = PublisherConfig.Client;
        private static readonly MqttClientOptions ClientOptions = PublisherConfig.ClientOptions;
        private static readonly string Topic = PublisherConfig.Topic;

        public static void RunAsync()
        {
            try
            {
                Console.WriteLine("publisher is running");
                CreateConnection();
                LoopInput().Wait();
            }
            catch (Exception exception)
            {
                Console.WriteLine(exception);
            }
        }
        
        private static void CreateConnection()
        {
            CommonEventHandler.EventHandler(Client, ClientOptions);
            CommonEventHandler.TryConnectionAsync(Client, ClientOptions);
        }
        
        private static async Task LoopInput()
        {
            while (true)
            {
                await Client.SubscribeAsync("/World");

                Console.WriteLine("输入消息数据:");
                var r = Console.ReadLine();

                var applicationMessage = new MqttApplicationMessageBuilder()
                    .WithTopic(Topic)         // 设置主题
                    .WithPayload(r)           // 设置载荷(消息内容)
                    .WithAtLeastOnceQoS()     // 设置质量
                    .WithRetainFlag(false)    // 设置持久化
                    .Build();

                await Client.PublishAsync(applicationMessage);
            }   
        }

 

 创建一个或多个订阅者

    public static class SubscriberConfig01
    {
        public static readonly IMqttClient Client = new MqttFactory().CreateMqttClient();

        public static readonly MqttClientOptions ClientOptions = new MqttClientOptions
        {
            ChannelOptions = new MqttClientTcpOptions
            {
                Server = "192.168.153.132"
            },
            // 唯一需要修改的Client唯一ID
            ClientId = "Client_01",
            Credentials = new MqttClientCredentials
            {
                // 用户可用多个,也可不启用客户端验证
                Username = "clientUser_02",
                Password = "123123"
            }
        };

        public const string Topic = MQTT_Common.Config.Topic.Name;
    }

 

MQTTnet中所有连接端都是Client,所以唯一的变化是ClientID这个值,可使用系统GUID自动生成,也可以使用不同的Client限定名。

    public static class Subscriber01
    {
        private static readonly IMqttClient Client = SubscriberConfig01.Client;
        private static readonly MqttClientOptions ClientOptions = SubscriberConfig01.ClientOptions;
        private static readonly string Topic = SubscriberConfig01.Topic;

        public static async Task RunAsync()
        {
            try
            {
                Console.WriteLine("subscriber 1 is running");
                CreateConnection();
                // 注册/订阅Topic主题
                await Client.SubscribeAsync(Topic);
                Console.ReadKey();
            }
            catch (Exception exception)
            {
                Console.WriteLine(exception);
            }
        }

        private static void CreateConnection()
        {
            CommonEventHandler.EventHandler(Client, ClientOptions);
            CommonEventHandler.TryConnectionAsync(Client, ClientOptions);
        }
    }

 

公共函数CommonEventHandler,用于公共事件处理和连接尝试

    public static class CommonEventHandler
    {
        /// <summary>
        /// 添加Client相关事件处理函数
        /// </summary>
        /// <param name="client">IMqttClient客户端</param>
        /// <param name="clientOptions">MqttClientOptions配置选项</param>
        /// <param name="subscribe">主题Topic名称</param>
        public static void EventHandler(IMqttClient client, MqttClientOptions clientOptions, string subscribe = "#")
        {
            client.ApplicationMessageReceived += (s, e) =>
            {
                Console.WriteLine("### 收到程序消息 ###");
                Console.WriteLine($"+ [主题]Topic = {e.ApplicationMessage.Topic}");
                Console.WriteLine($"+ [载荷]Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
                Console.WriteLine($"+ [质量]QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
                Console.WriteLine($"+ [持久]Retain = {e.ApplicationMessage.Retain}");
                Console.WriteLine();
            };

            client.Connected += async (s, e) =>
            {
                Console.WriteLine("### 成功连接MQTT ###");
                if (!subscribe.Equals("#")) return;
                await client.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());
                Console.WriteLine($"### 成功订阅主题[{subscribe}] ###");
            };

            client.Disconnected += async (s, e) =>
            {
                Console.WriteLine("### 连接丢失 ###");
                await Task.Delay(TimeSpan.FromSeconds(5));
                try
                {
                    await client.ConnectAsync(clientOptions);
                }
                catch
                {
                    Console.WriteLine("### 连接错误 ###");
                }
            };
        }
        
        /// <summary>
        /// 尝试Client连接到EMQTT
        /// </summary>
        /// <param name="client">IMqttClient客户端</param>
        /// <param name="clientOptions">MqttClientOptions配置选项</param>
        public static void TryConnectionAsync(IMqttClient client, MqttClientOptions clientOptions)
        {
            try
            {
                client.ConnectAsync(clientOptions).Wait();
            }
            catch (Exception exception)
            {
                Console.WriteLine("### 连接错误 ###" + Environment.NewLine + exception);
            }
        }
    }

 

配置三个客户端,一个作为发布者,两个作为订阅者,通过发布者输入任意数据,运行结果如下:

其中发布者客户端会收到一个Retain=True的消息,是因为之前有发布过一条持久化的消息,该消息已经存储在EMQTT中,每次启动均会主动向订阅过"/World"的订阅者再次推送该条持久化的消息。
根据生产消费的机制,此处消息理当已经被消费和清除,但不清楚为何EMQ仍然存在。
 

总结

本篇简单介绍使用EMQTT的PubSub模式,并使用MQTTnet客户端连接并订阅和发布消息内容。
 
 
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
数据安全/隐私保护
mqtt学习记录
mqtt学习记录
49 0
|
负载均衡 物联网 网络性能优化
EMQ是什么 ?
EMQ(Erlang MQTT Broker)是一个基于 Erlang/OTP 平台开发的开源 MQTT(Message Queuing Telemetry Transport)消息代理服务器。
2037 0
|
消息中间件 Ubuntu Shell
ubuntu安装rabbitmq教程 避坑
ubuntu安装rabbitmq教程 避坑
489 0
|
消息中间件 存储 NoSQL
RabbitMQ入门指南:初学者也能读懂的教程(五)
RabbitMQ入门指南:初学者也能读懂的教程
146 0
|
消息中间件 存储 JSON
从零开始搭建物联网平台(四)EMQ-X消息中间件
从零开始搭建物联网平台(四)EMQ-X消息中间件
1037 0
|
存储 运维 Linux
【Linux运维】有关服务器
【Linux运维】有关服务器
|
运维 Linux 虚拟化
【Linux运维】安装——配置网络
【Linux运维】安装——配置网络
137 0
|
消息中间件 存储 监控
超详细:这份全网首发的Kafka技术手册,从基础到实战一应俱全
Kafka正在爆炸式增长。超过三分之一的财富500强企业都使用Kafka。这些公司包括十大旅游公司,十大银行中的七家,十大保险公司中的八家,十大电信公司中的九家,以及更多。LinkedIn,微软和Netflix每天使用Kafka(1,000,000,000,000)处理万亿级的消息。Kafka用于实时数据流,收集大数据或进行实时分析(或两者兼而有之)。Kafka与内存微服务一起使用以提供可靠性,它可用于向 CEP(复杂事件流系统)和IoT / IFTTT式自动化系统提供事件。
|
物联网 网络性能优化 开发者
MQTT -基于 mosquitto 开源软件应用开发介绍 | 学习笔记
快速学习 MQTT-基于 mosquitto 开源软件应用开发介绍
MQTT -基于 mosquitto 开源软件应用开发介绍 | 学习笔记
|
Ubuntu
UBUNTU安装EMQ
UBUNTU安装EMQ
103 0