vue3中使用mqtt数据传输(封装)

简介: vue3中使用mqtt数据传输(封装)

使用版本
"mqtt": "^5.8.0",
1
安装指令

npm install mqtt --save

yarn add mqtt

1
2
3
4
介绍mqtt
参考使用文档

配置
connection: {
protocol: "ws",
host: "broker.emqx.io",
port: 8083,
endpoint: "/mqtt",
clean: true,
connectTimeout: 30 * 1000, // ms
reconnectPeriod: 4000, // ms
clientId: "emqxvue" + Math.random().toString(16).substring(2, 8),
// 随机数 每次不能重复
username: "emqx_test",
password: "emqx_test",
},
1
2
3
4
5
6
7
8
9
10
11
12
13
连接
import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)

client.on('connect', (e) => {
// 订阅主题

})
1
2
3
4
5
6
7
8
订阅主题
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
1
2
3
4
5
6
7
消息发布
给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。

client.publish(publishTopic, {"messageType":1,"messageContent":""}, { qos: 0 }, (err) => {
if (!err) {
console.log('发送成功')
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
} else {
console.log('消息发送失败!')
}
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
取消订阅
client.unsubscribe(topicList, (error) => {
console.log('主题为' + topicList + '取消订阅成功', error)
})
1
2
3
断开连接
export function unconnect() {
client.end()
client = null
// Message.warning('服务器已断开连接!')
console.log('服务器已断开连接!')
}
1
2
3
4
5
6
mqtt封装使用(ts版)
import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';

interface ClientOptions extends IClientOptions {
clientId: string;
}

interface SubscribeOptions {
topic: string;
callback: (topic: string, message: string) => void;
subscribeOption?: mqtt.IClientSubscribeOptions;
}

interface PublishOptions {
topic: string;
message: string;
}

class Mqtt {
private static instance: Mqtt;
private client: MqttClient | undefined;
private subscribeMembers: Record void) | undefined> = {};
private pendingSubscriptions: SubscribeOptions[] = [];
private pendingPublications: PublishOptions[] = [];
private isConnected: boolean = false;

private constructor(url?: string) {
if (url) {
this.connect(url);
}
}

public static getInstance(url?: string): Mqtt {
if (!Mqtt.instance) {
Mqtt.instance = new Mqtt(url);
} else if (url && !Mqtt.instance.client) {
Mqtt.instance.connect(url);
}
return Mqtt.instance;
}

private connect(url: string): void {
console.log(url, clientOptions);
if (!this.client) {
this.client = mqtt.connect(url, clientOptions);
this.client.on('connect', this.onConnect);
this.client.on('reconnect', this.onReconnect);
this.client.on('error', this.onError);
this.client.on('message', this.onMessage);
}
}

public disconnect(): void {
if (this.client) {
this.client.end();
this.client = undefined;
this.subscribeMembers = {};
this.isConnected = false;
console.log(服务器已断开连接!);
}
}

public subscribe({ topic, callback }: SubscribeOptions): void {
if (this.isConnected) {
this.client?.subscribe(topic, { qos: 1 }, error => {
if (error) {
console.log(客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败:, error);
} else {
console.log(客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功);
}
});
this.subscribeMembers[topic] = callback;
} else {
this.pendingSubscriptions.push({ topic, callback });
}
}

public unsubscribe(topic: string): void {
if (!this.client) {
return;
}
this.client.unsubscribe(topic, error => {
if (error) {
console.log(客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败:, error);
} else {
console.log(客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功);
}
});
this.subscribeMembers[topic] = undefined;
}

public publish({ topic, message }: PublishOptions): void {
if (this.isConnected) {
this.client?.publish(topic, message, { qos: 1 }, e => {
if (e) {
console.log(客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败:, e);
}
});
} else {
this.pendingPublications.push({ topic, message });
}
}

private onConnect = (e: any): void => {
console.log(客户端: ${clientOptions.clientId}, 连接服务器成功:, e);
this.isConnected = true;
this.processPendingSubscriptions();
this.processPendingPublications();
};

private onReconnect = (): void => {
console.log(客户端: ${clientOptions.clientId}, 正在重连:);
this.isConnected = false;
};

private onError = (error: Error): void => {
console.log(客户端: ${clientOptions.clientId}, 连接失败:, error);
this.isConnected = false;
};

private onMessage = (topic: string, message: Buffer): void => {
// console.log(
// 客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息:,
// message.toString(),
// );
const callback = this.subscribeMembers?.[topic];
callback?.(topic, message.toString());
};

private processPendingSubscriptions(): void {
while (this.pendingSubscriptions.length > 0) {
const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;
this.subscribe({ topic, callback, subscribeOption });
}
}

private processPendingPublications(): void {
while (this.pendingPublications.length > 0) {
const { topic, message } = this.pendingPublications.shift()!;
this.publish({ topic, message });
}
}
}

const clientOptions: ClientOptions = {
clean: true,
connectTimeout: 500,
protocolVersion: 5,
rejectUnauthorized: false,
username: 'admin',
password: 'Anjian-emqx',
clientId: client-${Date.now()}
};

// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
注意:

环境配置
.env.test
VITE_OTHER_SERVICE_BASE_URL= { "mqtt": "ws://192.168.11.14:8083/mqtt" }
1
2
3
qos设置 前后端统一为1

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 Kafka 双11
消息中间件mq的比较:含RocketMQ、RabbitMQ、Kafka; rocketmq底层封装
消息中间件mq的比较:含RocketMQ、RabbitMQ、Kafka 共同点都是消息队列,有mq的特性 队列(先进先出原则)
327 2
消息中间件mq的比较:含RocketMQ、RabbitMQ、Kafka; rocketmq底层封装
|
消息中间件 数据安全/隐私保护 Docker
使用Docker-compose来封装celery4.1+rabbitmq3.7服务,实现微服务架构
大家都知道,Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,在之前的一篇文章中:[python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务](https://v3u.cn/a_id_99)详细阐述了如何进行安装部署和使用,但是过程太繁琐了,先得安装Erlang,再安装rabbitmq,然后各种配置,最后由于async关键字问题还得去修改三方库的源码,其实我们可以通过docker来将celery服务封装成镜像,如此一来,以后再使用celery或者别的系统依赖celery,我们只需要将该镜像以容器的形式跑服务即可,不需要繁琐的配
使用Docker-compose来封装celery4.1+rabbitmq3.7服务,实现微服务架构
|
Linux
MQTT 开源代理mosquitto的网络层封装相当sucks
MQTT 开源代理mosquitto的网络层封装相当sucks
404 0
|
27天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
66 4
|
22天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
25天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
69 6
|
1月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4