golang开源的可嵌入应用程序高性能的MQTT服务

简介: golang开源的可嵌入应用程序高性能的MQTT服务

1
2
docker部署
可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像:

docker pull mochimqtt/server
或者
docker run mochimqtt/server
1
2
3
也提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听:

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest
1
2
嵌入自己项目运行和开发
下载Mochi MQTT包

go get github.com/mochi-mqtt/server/v2
1
将Mochi MQTT作为包导入使用, 示例代码如下

import (
mqttServer "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
)

var Server *mqttServer.Server

func ServerMqttInit() {
// 创建新的 MQTT 服务器。
Server = mqttServer.New(&mqttServer.Options{
InlineClient: true, // 启动内联客户端
})

// 初始化数据库实例
edge := &edgeHook{deviceDao: deviceDao.NewDeviceRepository(),
    productDao:     productDao.NewProductRepository(),
}
// 添加自定义权限方法
err := Server.AddHook(edge, nil)
if err != nil {
    log.Fatal(err)
}

// 在1883端口上创建一个 TCP 服务端。
tcp := listeners.NewTCP("t1", ":1883", nil)
err = Server.AddListener(tcp)
if err != nil {
    log.Fatal(err)
}

// 在1882端口上创建一个 Websocket 服务端。
ws := listeners.NewWebsocket("ws1", ":1882", nil)
err = server.AddListener(ws)
if err != nil {
    log.Fatal(err)
}

go func() {
    err := Server.Serve()
    if err != nil {
        log.Fatal(err)
    }
}()

}

type edgeHook struct {
mqttServer.HookBase
deviceDao deviceDao.DeviceRepository
productDao productDao.ProductRepository
}

func (h *edgeHook) ID() string {
return "mqtt-auth"
}

func (h *edgeHook) Provides(b byte) bool {
// 实现钩子函数
return bytes.Contains([]byte{
//MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
mqttServer.OnConnectAuthenticate,
//MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
mqttServer.OnACLCheck,
//在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
mqttServer.OnSessionEstablish,
//当客户端因任何原因断开连接时调用。
mqttServer.OnDisconnect,
//当客户端向订阅者发布消息后调用。
mqttServer.OnPublished,
}, []byte{b})
}

// OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
func (h edgeHook) OnConnectAuthenticate(cl mqttServer.Client, pk packets.Packet) bool {
username := string(pk.Connect.Username)
password := string(pk.Connect.Password)
if username == "" || len(username) == 0 {
return false
}
if password == "" || len(password) == 0 {
return false
}
return true
}

// OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
func (h edgeHook) OnACLCheck(cl mqttServer.Client, topic string, write bool) bool {
username := string(cl.Properties.Username)
if username == "" || len(username) == 0 {
return false
}
if topic == "" || len(topic) == 0 {
return false
}
return true
}

// OnSessionEstablish 在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
func (h edgeHook) OnSessionEstablish(cl mqttServer.Client, pk packets.Packet) {
username := string(cl.Properties.Username)
if username == "" || len(username) == 0 {
return
}
//设备连接MQTT成功后保存设备在线状态
}

// OnDisconnect 当客户端因任何原因断开连接时调用。
func (h edgeHook) OnDisconnect(cl mqttServer.Client, err error, expire bool) {
username := string(cl.Properties.Username)
if username == "" || len(username) == 0 {
return
}
//设备断开MQTT成功后保存设备离线状态
}

// OnPublished 当客户端向订阅者发布消息后调用。
func (h edgeHook) OnPublished(cl mqttServer.Client, pk packets.Packet) {
Log.Infof("mqtt server OnPublished info topic=%s, msg=%s", pk.TopicName, string(pk.Payload))
//收到客户端消息后做业务逻辑处理
}

// 使用内联客户端方式,向MQTT发送消息
func PublishMsg(topic string, msg []byte) bool {
err := Server.Publish(topic, msg, false, 0)
if err != nil {
Log.Errorf("mqtt EdgePublish error=%v, topic=%s, msg=%s", err, topic, msg)
return false
}
return true
}

// 使用内联客户端方式,订阅边缘MQTT消息topic
func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {
callbackFn := func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {
Log.Info("mqtt EdgeSubscribe received message", "client", cl.ID, "subscriptionId", sub.Identifier,
"topic", pk.TopicName, "payload", string(pk.Payload))
callback(pk.TopicName, pk.Payload)
}
_ = Server.Subscribe(topic, subscriptionId, callbackFn)
}

// 使用内联客户端方式,取消订阅边缘MQTT消息topic
func UnsubscribeTopic(topic string, subscriptionId int) {
_ = Server.Unsubscribe(topic, subscriptionId)
}

func main() {
// 创建信号用于等待服务端关闭信号
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()

<-done
Log.Error("caught signal, stopping...")
Server.Close()
Log.Error("main.go finished")
}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 运维 Serverless
商业版vs开源版:一图看懂云消息队列 RocketMQ 版核心优势
自建开源 RocketMQ 集群,为保证业务稳定性,往往需要按照业务请求的峰值去配置集群资源。云消息队列 RocketMQ 版 Serverless 实例通过资源快速伸缩,实现资源使用量与实际业务负载贴近,并按实际使用量计费,有效降低企业的运维压力和使用成本。
283 16
|
1月前
|
人工智能 开发框架 数据可视化
Eino:字节跳动开源基于Golang的AI应用开发框架,组件化设计助力构建AI应用
Eino 是字节跳动开源的大模型应用开发框架,帮助开发者高效构建基于大模型的 AI 应用。支持组件化设计、流式处理和可视化开发工具。
226 27
|
9天前
|
消息中间件 人工智能 自然语言处理
基于 RocketMQ 事件驱动架构的 AI 应用实践
基于 RocketMQ 事件驱动架构的 AI 应用实践
|
9天前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
|
2月前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ、Apache Seata 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。在评审出的 10 个年度开源项目中,Apache RocketMQ、Apache Seata 成功入选。
101 13
|
3月前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
110 7
|
4月前
|
消息中间件 弹性计算 运维
一图看懂云消息队列 RabbitMQ 版对比开源优势
一张图带您快速了解云消息队列 RabbitMQ 版对比开源版本的显著优势。
103 14
|
3月前
|
消息中间件 存储 中间件
说说MQ在你项目中的应用(二)商品支付
本文总结了消息队列(MQ)在支付订单业务中的应用,重点分析了RabbitMQ的优势。通过异步处理、系统解耦和流量削峰等功能,RabbitMQ确保了支付流程的高效与稳定。具体场景包括用户下单、支付请求、商品生产和物流配送等环节。相比Kafka,RabbitMQ在低吞吐量、高实时性需求下表现更优,提供了更低延迟和更高的可靠性。
90 0
|
4月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
258 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
4月前
|
存储 算法 安全
FreeMQTT:一款Python语言实现的开源MQTT Server
FreeMQTT 是一款用 Python 语言并基于 Tornado 开发的开源 MQTT 服务器,支持 MQTT3.1.1 和 MQTT5.0 协议,提供多租户安全隔离、高效 Topic 匹配算法及实时上下线通知等功能,适用于 IoT 场景。快速启动仅需克隆仓库、安装依赖并运行服务。