使用版本
"mqtt": "^5.8.0",
1
安装指令
npm install mqtt --save
yarn add mqtt
1
2
3
4
介绍mqtt
参考使用文档
配置
connection: {
protocol: "ws",
host: "broker.emqx.io",
port: 8083,
endpoint: "/mqtt",
clean: true,
connectTimeout: 30 * 1000, // ms
reconnectPeriod: 4000, // ms
clientId: "emqxvue" + Math.random().toString(16).substring(2, 8),
// 随机数 每次不能重复
username: "emqx_test",
password: "emqx_test",
},
1
2
3
4
5
6
7
8
9
10
11
12
13
连接
import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)
client.on('connect', (e) => {
// 订阅主题
})
1
2
3
4
5
6
7
8
订阅主题
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
1
2
3
4
5
6
7
消息发布
给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。
client.publish(publishTopic, {"messageType":1,"messageContent":""}
, { qos: 0 }, (err) => {
if (!err) {
console.log('发送成功')
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
} else {
console.log('消息发送失败!')
}
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
取消订阅
client.unsubscribe(topicList, (error) => {
console.log('主题为' + topicList + '取消订阅成功', error)
})
1
2
3
断开连接
export function unconnect() {
client.end()
client = null
// Message.warning('服务器已断开连接!')
console.log('服务器已断开连接!')
}
1
2
3
4
5
6
mqtt封装使用(ts版)
import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';
interface ClientOptions extends IClientOptions {
clientId: string;
}
interface SubscribeOptions {
topic: string;
callback: (topic: string, message: string) => void;
subscribeOption?: mqtt.IClientSubscribeOptions;
}
interface PublishOptions {
topic: string;
message: string;
}
class Mqtt {
private static instance: Mqtt;
private client: MqttClient | undefined;
private subscribeMembers: Record void) | undefined> = {};
private pendingSubscriptions: SubscribeOptions[] = [];
private pendingPublications: PublishOptions[] = [];
private isConnected: boolean = false;
private constructor(url?: string) {
if (url) {
this.connect(url);
}
}
public static getInstance(url?: string): Mqtt {
if (!Mqtt.instance) {
Mqtt.instance = new Mqtt(url);
} else if (url && !Mqtt.instance.client) {
Mqtt.instance.connect(url);
}
return Mqtt.instance;
}
private connect(url: string): void {
console.log(url, clientOptions);
if (!this.client) {
this.client = mqtt.connect(url, clientOptions);
this.client.on('connect', this.onConnect);
this.client.on('reconnect', this.onReconnect);
this.client.on('error', this.onError);
this.client.on('message', this.onMessage);
}
}
public disconnect(): void {
if (this.client) {
this.client.end();
this.client = undefined;
this.subscribeMembers = {};
this.isConnected = false;
console.log(服务器已断开连接!
);
}
}
public subscribe({ topic, callback }: SubscribeOptions): void {
if (this.isConnected) {
this.client?.subscribe(topic, { qos: 1 }, error => {
if (error) {
console.log(客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败:
, error);
} else {
console.log(客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功
);
}
});
this.subscribeMembers[topic] = callback;
} else {
this.pendingSubscriptions.push({ topic, callback });
}
}
public unsubscribe(topic: string): void {
if (!this.client) {
return;
}
this.client.unsubscribe(topic, error => {
if (error) {
console.log(客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败:
, error);
} else {
console.log(客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功
);
}
});
this.subscribeMembers[topic] = undefined;
}
public publish({ topic, message }: PublishOptions): void {
if (this.isConnected) {
this.client?.publish(topic, message, { qos: 1 }, e => {
if (e) {
console.log(客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败:
, e);
}
});
} else {
this.pendingPublications.push({ topic, message });
}
}
private onConnect = (e: any): void => {
console.log(客户端: ${clientOptions.clientId}, 连接服务器成功:
, e);
this.isConnected = true;
this.processPendingSubscriptions();
this.processPendingPublications();
};
private onReconnect = (): void => {
console.log(客户端: ${clientOptions.clientId}, 正在重连:
);
this.isConnected = false;
};
private onError = (error: Error): void => {
console.log(客户端: ${clientOptions.clientId}, 连接失败:
, error);
this.isConnected = false;
};
private onMessage = (topic: string, message: Buffer): void => {
// console.log(
// 客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息:
,
// message.toString(),
// );
const callback = this.subscribeMembers?.[topic];
callback?.(topic, message.toString());
};
private processPendingSubscriptions(): void {
while (this.pendingSubscriptions.length > 0) {
const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;
this.subscribe({ topic, callback, subscribeOption });
}
}
private processPendingPublications(): void {
while (this.pendingPublications.length > 0) {
const { topic, message } = this.pendingPublications.shift()!;
this.publish({ topic, message });
}
}
}
const clientOptions: ClientOptions = {
clean: true,
connectTimeout: 500,
protocolVersion: 5,
rejectUnauthorized: false,
username: 'admin',
password: 'Anjian-emqx',
clientId: client-${Date.now()}
};
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
注意:
环境配置
.env.test
VITE_OTHER_SERVICE_BASE_URL= { "mqtt": "ws://192.168.11.14:8083/mqtt" }
1
2
3
qos设置 前后端统一为1