laravel实现利用RabbitMQ实现MQTT即时通讯
有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功。而 RabbitMQ
可以很方便的实现即时通讯功能,如果你的业务只是少量地方使用即时通信,需要一个简易的消息系统,你可以直接考虑 MQ
的实现, MQ
有很高的吞吐率,具有持久化,还可以横向扩展,总之还不错,用就完了,奥利给!
本文需要安装好 rabbitMQ
和 laravel
,没弄好环境的看我之前的文章 php laravel5.5使用rabbitmq消息队列
MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP
协议上。 MQTT
最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
MQTT相关概念
实际上还是 MQ
的那些东西,主要看 MQ
有没有实现 MQTT
模型,懂的随便看看,不懂的先去理解 MQ
- Publisher(发布者):消息的发出者,负责发送消息。
- Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
- Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
- Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
- Payload(负载);可以理解为发送消息的内容。
- QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:
- QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
- QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
- QoS 2(Exactly Once):只有一次,确保消息只到达一次。
RabbitMQ启用MQTT功能
我们是采用 docker
安装的,直接进入容器一顿操作就行
docker exec -it rabbitmq bash rabbitmq-plugins enable rabbitmq_mqtt
开启成功后,查看管理控制台,我们可以发现 MQTT
服务运行在 1883
端口上了。
MQTT客户端
我们可以使用 MQTT
客户端来测试 MQTT
的即时通讯功能,这里使用的是 MQTTBox
这个客户端工具。
首先下载并安装好 MQTTBox
,下载地址:http://workswithweb.com/mqttbox.html
点击 Create MQTT Client
按钮来创建一个 MQTT
客户端;
接下来对 MQTT
客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可, 注意 Protocol
是 mqtt/tcp
然后我们利用这个工具测试一下发布和订阅消息是否可用,一端向 TopicA
发送消息,另一端订阅 TopicA
可用看到效果已经出现了,那么我们如何让前端来订阅呢?
前端实现即时通讯
我们通过 html+javascript
实现一个简单的聊天功能,由于 RabbitMQ
与 Web端
交互底层使用的是 WebSocket
,所以我们需要开启 RabbitMQ
的 MQTT WEB
支持,使用如下命令开启即可
docker exec -it rabbitmq bash rabbitmq-plugins enable rabbitmq_web_mqtt
开启成功后,查看管理控制台,我们可以发现 MQTT
的 WEB
服务运行在 15675
端口上了;
WEB端
与 MQTT
服务进行通讯需要使用一个叫 MQTT.js
的库,项目地址:https://github.com/mqttjs/MQTT.js
实现的功能非常简单,一个单聊功能,需要注意的是配置好 MQTT
服务的访问地址为:ws://localhost:15675/ws
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <div> <label>目标Topic:<input id="targetTopicInput" type="text"></label><br> <label>发送消息:<input id="messageInput" type="text"></label><br> <button onclick="sendMessage()">发送</button> <button onclick="clearMessage()">清空</button> <div id="messageDiv"></div> </div> </body> <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script> <script> //RabbitMQ的web-mqtt连接地址 const url = 'ws://ip:15675/ws'; //获取订阅的topic const topic = getQueryString("topic"); //连接到消息队列 let client = mqtt.connect(url); client.on('connect', function () { //连接成功后订阅topic client.subscribe(topic, function (err) { if (!err) { showMessage("订阅topic:" + topic + "成功!"); } }); }); //获取订阅topic中的消息 client.on('message', function (topic, message) { showMessage("收到消息:" + message.toString()); }); //发送消息 function sendMessage() { let targetTopic = document.getElementById("targetTopicInput").value; let message = document.getElementById("messageInput").value; //向目标topic中发送消息 client.publish(targetTopic, message); showMessage("发送消息给" + targetTopic + "的消息:" + message); } //从URL中获取参数 function getQueryString(name) { let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i"); let r = window.location.search.substr(1).match(reg); if (r != null) { return decodeURIComponent(r[2]); } return null; } //在消息列表中展示消息 function showMessage(message) { let messageDiv = document.getElementById("messageDiv"); let messageEle = document.createElement("div"); messageEle.innerText = message; messageDiv.appendChild(messageEle); } //清空消息列表 function clearMessage() { let messageDiv = document.getElementById("messageDiv"); messageDiv.innerHTML = ""; } </script> </html>
在Laravel中使用
需要保证 laravel
和 rabbitmq
已经可以正常生产和发布消息了,保证没问题再进行以下操作
- 安装mqtt包
composer require salmanzafar/laravel-mqtt
- app.php
/* * Application Service Providers... */ Salman\Mqtt\MqttServiceProvider::class,
- MqttService
<?php namespace App\Service; use Illuminate\Support\Facades\Auth; use Salman\Mqtt\MqttClass\Mqtt; class MqttService { public static function SendMsgViaMqtt($topic, $message) { $mqtt = new Mqtt(); $client_id = Auth::user()->id ?? 0; $output = $mqtt->ConnectAndPublish($topic, $message, $client_id); if ($output === true) { return "published"; } return "Failed"; } }
- laravel生产消息
Route::get('/pub', function () { $data = [ 'name' => 'zhangsan', 'age' => 18 ]; $res = \App\Service\MqttService::SendMsgViaMqtt('topicA', json_encode($data)); dump($res); });
请求路由测试
注意:通过url的queryString进行topic订阅
总结
消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。像普通的订单下了给后台一个推送等等,都可以选择采用 MQ
实现,方便好用!奥利给!!