vue3中使用mqtt数据传输(封装)

简介: vue3中使用mqtt数据传输(封装)

使用版本
"mqtt": "^5.8.0",
1
安装指令

npm install mqtt --save

yarn add mqtt

1
2
3
4
介绍mqtt
参考使用文档

配置
connection: {
protocol: "ws",
host: "broker.emqx.io",
port: 8083,
endpoint: "/mqtt",
clean: true,
connectTimeout: 30 * 1000, // ms
reconnectPeriod: 4000, // ms
clientId: "emqxvue" + Math.random().toString(16).substring(2, 8),
// 随机数 每次不能重复
username: "emqx_test",
password: "emqx_test",
},
1
2
3
4
5
6
7
8
9
10
11
12
13
连接
import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)

client.on('connect', (e) => {
// 订阅主题

})
1
2
3
4
5
6
7
8
订阅主题
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
1
2
3
4
5
6
7
消息发布
给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。

client.publish(publishTopic, {"messageType":1,"messageContent":""}, { qos: 0 }, (err) => {
if (!err) {
console.log('发送成功')
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
} else {
console.log('消息发送失败!')
}
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
取消订阅
client.unsubscribe(topicList, (error) => {
console.log('主题为' + topicList + '取消订阅成功', error)
})
1
2
3
断开连接
export function unconnect() {
client.end()
client = null
// Message.warning('服务器已断开连接!')
console.log('服务器已断开连接!')
}
1
2
3
4
5
6
mqtt封装使用(ts版)
import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';

interface ClientOptions extends IClientOptions {
clientId: string;
}

interface SubscribeOptions {
topic: string;
callback: (topic: string, message: string) => void;
subscribeOption?: mqtt.IClientSubscribeOptions;
}

interface PublishOptions {
topic: string;
message: string;
}

class Mqtt {
private static instance: Mqtt;
private client: MqttClient | undefined;
private subscribeMembers: Record void) | undefined> = {};
private pendingSubscriptions: SubscribeOptions[] = [];
private pendingPublications: PublishOptions[] = [];
private isConnected: boolean = false;

private constructor(url?: string) {
if (url) {
this.connect(url);
}
}

public static getInstance(url?: string): Mqtt {
if (!Mqtt.instance) {
Mqtt.instance = new Mqtt(url);
} else if (url && !Mqtt.instance.client) {
Mqtt.instance.connect(url);
}
return Mqtt.instance;
}

private connect(url: string): void {
console.log(url, clientOptions);
if (!this.client) {
this.client = mqtt.connect(url, clientOptions);
this.client.on('connect', this.onConnect);
this.client.on('reconnect', this.onReconnect);
this.client.on('error', this.onError);
this.client.on('message', this.onMessage);
}
}

public disconnect(): void {
if (this.client) {
this.client.end();
this.client = undefined;
this.subscribeMembers = {};
this.isConnected = false;
console.log(服务器已断开连接!);
}
}

public subscribe({ topic, callback }: SubscribeOptions): void {
if (this.isConnected) {
this.client?.subscribe(topic, { qos: 1 }, error => {
if (error) {
console.log(客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败:, error);
} else {
console.log(客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功);
}
});
this.subscribeMembers[topic] = callback;
} else {
this.pendingSubscriptions.push({ topic, callback });
}
}

public unsubscribe(topic: string): void {
if (!this.client) {
return;
}
this.client.unsubscribe(topic, error => {
if (error) {
console.log(客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败:, error);
} else {
console.log(客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功);
}
});
this.subscribeMembers[topic] = undefined;
}

public publish({ topic, message }: PublishOptions): void {
if (this.isConnected) {
this.client?.publish(topic, message, { qos: 1 }, e => {
if (e) {
console.log(客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败:, e);
}
});
} else {
this.pendingPublications.push({ topic, message });
}
}

private onConnect = (e: any): void => {
console.log(客户端: ${clientOptions.clientId}, 连接服务器成功:, e);
this.isConnected = true;
this.processPendingSubscriptions();
this.processPendingPublications();
};

private onReconnect = (): void => {
console.log(客户端: ${clientOptions.clientId}, 正在重连:);
this.isConnected = false;
};

private onError = (error: Error): void => {
console.log(客户端: ${clientOptions.clientId}, 连接失败:, error);
this.isConnected = false;
};

private onMessage = (topic: string, message: Buffer): void => {
// console.log(
// 客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息:,
// message.toString(),
// );
const callback = this.subscribeMembers?.[topic];
callback?.(topic, message.toString());
};

private processPendingSubscriptions(): void {
while (this.pendingSubscriptions.length > 0) {
const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;
this.subscribe({ topic, callback, subscribeOption });
}
}

private processPendingPublications(): void {
while (this.pendingPublications.length > 0) {
const { topic, message } = this.pendingPublications.shift()!;
this.publish({ topic, message });
}
}
}

const clientOptions: ClientOptions = {
clean: true,
connectTimeout: 500,
protocolVersion: 5,
rejectUnauthorized: false,
username: 'admin',
password: 'Anjian-emqx',
clientId: client-${Date.now()}
};

// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
注意:

环境配置
.env.test
VITE_OTHER_SERVICE_BASE_URL= { "mqtt": "ws://192.168.11.14:8083/mqtt" }
1
2
3
qos设置 前后端统一为1

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 Kafka 双11
消息中间件mq的比较:含RocketMQ、RabbitMQ、Kafka; rocketmq底层封装
消息中间件mq的比较:含RocketMQ、RabbitMQ、Kafka 共同点都是消息队列,有mq的特性 队列(先进先出原则)
313 2
消息中间件mq的比较:含RocketMQ、RabbitMQ、Kafka; rocketmq底层封装
|
消息中间件 数据安全/隐私保护 Docker
使用Docker-compose来封装celery4.1+rabbitmq3.7服务,实现微服务架构
大家都知道,Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,在之前的一篇文章中:[python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务](https://v3u.cn/a_id_99)详细阐述了如何进行安装部署和使用,但是过程太繁琐了,先得安装Erlang,再安装rabbitmq,然后各种配置,最后由于async关键字问题还得去修改三方库的源码,其实我们可以通过docker来将celery服务封装成镜像,如此一来,以后再使用celery或者别的系统依赖celery,我们只需要将该镜像以容器的形式跑服务即可,不需要繁琐的配
使用Docker-compose来封装celery4.1+rabbitmq3.7服务,实现微服务架构
|
Linux
MQTT 开源代理mosquitto的网络层封装相当sucks
MQTT 开源代理mosquitto的网络层封装相当sucks
390 0
|
6月前
|
SQL 分布式计算 监控
在数据传输服务(DTS)中,要查看每个小时源端产生了多少条数据
【2月更文挑战第32天】在数据传输服务(DTS)中,要查看每个小时源端产生了多少条数据
63 6
|
6月前
|
存储 SQL NoSQL
数据传输DTS同步问题之同步失败如何解决
数据传输服务(DTS)是一项专注于数据迁移和同步的云服务,在使用过程中可能遇到多种问题,本合集精选常见的DTS数据传输问题及其答疑解惑,以助用户顺利实现数据流转。
|
6月前
|
Cloud Native NoSQL 关系型数据库
数据传输DTS校验问题之校验报错如何解决
数据传输服务(DTS)是一项专注于数据迁移和同步的云服务,在使用过程中可能遇到多种问题,本合集精选常见的DTS数据传输问题及其答疑解惑,以助用户顺利实现数据流转。
|
3月前
|
存储 安全 关系型数据库
跨越地域的数据传输大冒险!如何轻松更换DTS实例地域,全面攻略揭秘!
【8月更文挑战第15天】在数字时代的浪潮中,数据传输服务(DTS)是企业跨地域扩张的重要桥梁。然而,更换DTS实例地域就像是一场冒险旅程,充满了未知和挑战。本文将带你踏上这场跨越地域的数据传输大冒险,揭示如何轻松更换DTS实例地域的秘密。无论你是追求速度的迁移高手,还是成本敏感的手动操作者,这里都有你需要的答案。让我们一起探索这个神秘的世界,解锁数据传输的无限可能!
41 0
|
3月前
|
关系型数据库 MySQL OLAP
数据传输DTS是什么?
【8月更文挑战第30天】数据传输DTS是什么?
163 3
|
5月前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之当使用DTS(数据传输服务)同步的表在目标库中进行LEFT JOIN查询时遇到异常,是什么导致的
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。