1
2
docker部署
可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像:
docker pull mochimqtt/server
或者
docker run mochimqtt/server
1
2
3
也提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听:
docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest
1
2
嵌入自己项目运行和开发
下载Mochi MQTT包
go get github.com/mochi-mqtt/server/v2
1
将Mochi MQTT作为包导入使用, 示例代码如下
import (
mqttServer "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
)
var Server *mqttServer.Server
func ServerMqttInit() {
// 创建新的 MQTT 服务器。
Server = mqttServer.New(&mqttServer.Options{
InlineClient: true, // 启动内联客户端
})
// 初始化数据库实例
edge := &edgeHook{deviceDao: deviceDao.NewDeviceRepository(),
productDao: productDao.NewProductRepository(),
}
// 添加自定义权限方法
err := Server.AddHook(edge, nil)
if err != nil {
log.Fatal(err)
}
// 在1883端口上创建一个 TCP 服务端。
tcp := listeners.NewTCP("t1", ":1883", nil)
err = Server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}
// 在1882端口上创建一个 Websocket 服务端。
ws := listeners.NewWebsocket("ws1", ":1882", nil)
err = server.AddListener(ws)
if err != nil {
log.Fatal(err)
}
go func() {
err := Server.Serve()
if err != nil {
log.Fatal(err)
}
}()
}
type edgeHook struct {
mqttServer.HookBase
deviceDao deviceDao.DeviceRepository
productDao productDao.ProductRepository
}
func (h *edgeHook) ID() string {
return "mqtt-auth"
}
func (h *edgeHook) Provides(b byte) bool {
// 实现钩子函数
return bytes.Contains([]byte{
//MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
mqttServer.OnConnectAuthenticate,
//MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
mqttServer.OnACLCheck,
//在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
mqttServer.OnSessionEstablish,
//当客户端因任何原因断开连接时调用。
mqttServer.OnDisconnect,
//当客户端向订阅者发布消息后调用。
mqttServer.OnPublished,
}, []byte{b})
}
// OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
func (h edgeHook) OnConnectAuthenticate(cl mqttServer.Client, pk packets.Packet) bool {
username := string(pk.Connect.Username)
password := string(pk.Connect.Password)
if username == "" || len(username) == 0 {
return false
}
if password == "" || len(password) == 0 {
return false
}
return true
}
// OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
func (h edgeHook) OnACLCheck(cl mqttServer.Client, topic string, write bool) bool {
username := string(cl.Properties.Username)
if username == "" || len(username) == 0 {
return false
}
if topic == "" || len(topic) == 0 {
return false
}
return true
}
// OnSessionEstablish 在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
func (h edgeHook) OnSessionEstablish(cl mqttServer.Client, pk packets.Packet) {
username := string(cl.Properties.Username)
if username == "" || len(username) == 0 {
return
}
//设备连接MQTT成功后保存设备在线状态
}
// OnDisconnect 当客户端因任何原因断开连接时调用。
func (h edgeHook) OnDisconnect(cl mqttServer.Client, err error, expire bool) {
username := string(cl.Properties.Username)
if username == "" || len(username) == 0 {
return
}
//设备断开MQTT成功后保存设备离线状态
}
// OnPublished 当客户端向订阅者发布消息后调用。
func (h edgeHook) OnPublished(cl mqttServer.Client, pk packets.Packet) {
Log.Infof("mqtt server OnPublished info topic=%s, msg=%s", pk.TopicName, string(pk.Payload))
//收到客户端消息后做业务逻辑处理
}
// 使用内联客户端方式,向MQTT发送消息
func PublishMsg(topic string, msg []byte) bool {
err := Server.Publish(topic, msg, false, 0)
if err != nil {
Log.Errorf("mqtt EdgePublish error=%v, topic=%s, msg=%s", err, topic, msg)
return false
}
return true
}
// 使用内联客户端方式,订阅边缘MQTT消息topic
func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {
callbackFn := func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {
Log.Info("mqtt EdgeSubscribe received message", "client", cl.ID, "subscriptionId", sub.Identifier,
"topic", pk.TopicName, "payload", string(pk.Payload))
callback(pk.TopicName, pk.Payload)
}
_ = Server.Subscribe(topic, subscriptionId, callbackFn)
}
// 使用内联客户端方式,取消订阅边缘MQTT消息topic
func UnsubscribeTopic(topic string, subscriptionId int) {
_ = Server.Unsubscribe(topic, subscriptionId)
}
func main() {
// 创建信号用于等待服务端关闭信号
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
<-done
Log.Error("caught signal, stopping...")
Server.Close()
Log.Error("main.go finished")
}