golang开源的可嵌入应用程序高性能的MQTT服务

简介: golang开源的可嵌入应用程序高性能的MQTT服务

1
2
docker部署
可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像:

docker pull mochimqtt/server
或者
docker run mochimqtt/server
1
2
3
也提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听:

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest
1
2
嵌入自己项目运行和开发
下载Mochi MQTT包

go get github.com/mochi-mqtt/server/v2
1
将Mochi MQTT作为包导入使用, 示例代码如下

import (
mqttServer "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
)

var Server *mqttServer.Server

func ServerMqttInit() {
// 创建新的 MQTT 服务器。
Server = mqttServer.New(&mqttServer.Options{
InlineClient: true, // 启动内联客户端
})

// 初始化数据库实例
edge := &edgeHook{deviceDao: deviceDao.NewDeviceRepository(),
    productDao:     productDao.NewProductRepository(),
}
// 添加自定义权限方法
err := Server.AddHook(edge, nil)
if err != nil {
    log.Fatal(err)
}

// 在1883端口上创建一个 TCP 服务端。
tcp := listeners.NewTCP("t1", ":1883", nil)
err = Server.AddListener(tcp)
if err != nil {
    log.Fatal(err)
}

// 在1882端口上创建一个 Websocket 服务端。
ws := listeners.NewWebsocket("ws1", ":1882", nil)
err = server.AddListener(ws)
if err != nil {
    log.Fatal(err)
}

go func() {
    err := Server.Serve()
    if err != nil {
        log.Fatal(err)
    }
}()

}

type edgeHook struct {
mqttServer.HookBase
deviceDao deviceDao.DeviceRepository
productDao productDao.ProductRepository
}

func (h *edgeHook) ID() string {
return "mqtt-auth"
}

func (h *edgeHook) Provides(b byte) bool {
// 实现钩子函数
return bytes.Contains([]byte{
//MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
mqttServer.OnConnectAuthenticate,
//MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
mqttServer.OnACLCheck,
//在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
mqttServer.OnSessionEstablish,
//当客户端因任何原因断开连接时调用。
mqttServer.OnDisconnect,
//当客户端向订阅者发布消息后调用。
mqttServer.OnPublished,
}, []byte{b})
}

// OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
func (h edgeHook) OnConnectAuthenticate(cl mqttServer.Client, pk packets.Packet) bool {
username := string(pk.Connect.Username)
password := string(pk.Connect.Password)
if username == "" || len(username) == 0 {
return false
}
if password == "" || len(password) == 0 {
return false
}
return true
}

// OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
func (h edgeHook) OnACLCheck(cl mqttServer.Client, topic string, write bool) bool {
username := string(cl.Properties.Username)
if username == "" || len(username) == 0 {
return false
}
if topic == "" || len(topic) == 0 {
return false
}
return true
}

// OnSessionEstablish 在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
func (h edgeHook) OnSessionEstablish(cl mqttServer.Client, pk packets.Packet) {
username := string(cl.Properties.Username)
if username == "" || len(username) == 0 {
return
}
//设备连接MQTT成功后保存设备在线状态
}

// OnDisconnect 当客户端因任何原因断开连接时调用。
func (h edgeHook) OnDisconnect(cl mqttServer.Client, err error, expire bool) {
username := string(cl.Properties.Username)
if username == "" || len(username) == 0 {
return
}
//设备断开MQTT成功后保存设备离线状态
}

// OnPublished 当客户端向订阅者发布消息后调用。
func (h edgeHook) OnPublished(cl mqttServer.Client, pk packets.Packet) {
Log.Infof("mqtt server OnPublished info topic=%s, msg=%s", pk.TopicName, string(pk.Payload))
//收到客户端消息后做业务逻辑处理
}

// 使用内联客户端方式,向MQTT发送消息
func PublishMsg(topic string, msg []byte) bool {
err := Server.Publish(topic, msg, false, 0)
if err != nil {
Log.Errorf("mqtt EdgePublish error=%v, topic=%s, msg=%s", err, topic, msg)
return false
}
return true
}

// 使用内联客户端方式,订阅边缘MQTT消息topic
func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {
callbackFn := func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {
Log.Info("mqtt EdgeSubscribe received message", "client", cl.ID, "subscriptionId", sub.Identifier,
"topic", pk.TopicName, "payload", string(pk.Payload))
callback(pk.TopicName, pk.Payload)
}
_ = Server.Subscribe(topic, subscriptionId, callbackFn)
}

// 使用内联客户端方式,取消订阅边缘MQTT消息topic
func UnsubscribeTopic(topic string, subscriptionId int) {
_ = Server.Unsubscribe(topic, subscriptionId)
}

func main() {
// 创建信号用于等待服务端关闭信号
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()

<-done
Log.Error("caught signal, stopping...")
Server.Close()
Log.Error("main.go finished")
}

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9月前
|
中间件 Go
Golang | Gin:net/http与Gin启动web服务的简单比较
总的来说,`net/http`和 `Gin`都是优秀的库,它们各有优缺点。你应该根据你的需求和经验来选择最适合你的工具。希望这个比较可以帮助你做出决策。
491 35
|
运维 监控 Cloud Native
一行代码都不改,Golang 应用链路指标日志全知道
本文将通过阿里云开源的 Golang Agent,帮助用户实现“一行代码都不改”就能获取到应用产生的各种观测数据,同时提升运维团队和研发团队的幸福感。
698 131
|
11月前
|
人工智能 开发框架 数据可视化
Eino:字节跳动开源基于Golang的AI应用开发框架,组件化设计助力构建AI应用
Eino 是字节跳动开源的大模型应用开发框架,帮助开发者高效构建基于大模型的 AI 应用。支持组件化设计、流式处理和可视化开发工具。
1923 27
|
算法 安全 测试技术
golang 栈数据结构的实现和应用
本文详细介绍了“栈”这一数据结构的特点,并用Golang实现栈。栈是一种FILO(First In Last Out,即先进后出或后进先出)的数据结构。文章展示了如何用slice和链表来实现栈,并通过golang benchmark测试了二者的性能差异。此外,还提供了几个使用栈结构解决的实际算法问题示例,如有效的括号匹配等。
298 1
golang 栈数据结构的实现和应用
|
前端开发 中间件 Go
实践Golang语言N层应用架构
【10月更文挑战第2天】本文介绍了如何在Go语言中使用Gin框架实现N层体系结构,借鉴了J2EE平台的多层分布式应用程序模型。文章首先概述了N层体系结构的基本概念,接着详细列出了Go语言中对应的构件名称,包括前端框架(如Vue.js、React)、Gin的处理函数和中间件、依赖注入和配置管理、会话管理和ORM库(如gorm或ent)。最后,提供了具体的代码示例,展示了如何实现HTTP请求处理、会话管理和数据库操作。
267 0
|
中间件 Go 数据处理
应用golang的管道-过滤器架构风格
【10月更文挑战第1天】本文介绍了一种面向数据流的软件架构设计模式——管道-过滤器(Pipe and Filter),并通过Go语言的Gin框架实现了一个Web应用示例。该模式通过将数据处理流程分解为一系列独立的组件(过滤器),并利用管道连接这些组件,实现了模块化、可扩展性和高效的分布式处理。文中详细讲解了Gin框架的基本使用、中间件的应用以及性能优化方法,展示了如何构建高性能的Web服务。
361 1
|
存储 监控 Go
面向OpenTelemetry的Golang应用无侵入插桩技术
文章主要讲述了阿里云 ARMS 团队与程序语言与编译器团队合作研发的面向OpenTelemetry的Golang应用无侵入插桩技术解决方案,旨在解决Golang应用监控的挑战。
1096 9
|
6月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
4月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
344 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
943 90

推荐镜像

更多