介绍
MQTT是一种机器对机器 (M2M) 的物联网连接协议。它被设计为一个极其轻量级的发布和订阅消息传输。它对于与需要少量代码和/或网络带宽非常宝贵的远程位置的连接非常有用。
每个 MQTT 客户端订阅某些主题并在发布者开始推送这些主题的消息时接收消息。
MQTT协议实现方式
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
如何向外扩展
水平扩展的目的是在同一应用程序的多个实例之间分配负载。如果这些实例中的 MQTT 客户端订阅了相同的主题,那么相同的 MQTT 消息将被传递到每个实例,这不是预期的。
安装MQTT服务(EMQ为例)
- 获取 Docker 镜像
docker pull emqx/emqx:4.3.7
- 启动 Docker 容器
docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:4.3.7
- 启动
当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,通过访问
http://localhost:18083 来查看Dashboard,默认用户名是 admin ,密码是 public 。
安装MQTT客户端
整合springboot
- 添加maven
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
<scope>compile</scope>
</dependency>
- 配置文件
mqtt:
inbound:
url: tcp://bs-emqx:1883
clientId: bs:iot:mqtt:client:subscribe
topics: $queue/mqtt/face/#,mqtt/Ack/#,mqtt/HeartBeat,mqtt/WillTopic,mqtt/DataStreamAck/#,mqtt/CaptureEvent/#
outbound:
url: tcp://xxx:1883
username: xxx
password: xxx
clientId: xxx
topic: mqtt/face/#
- 消息订阅配置
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.bszn.iot.mqtt.handler.BdForwardHandler;
import com.bszn.iot.mqtt.handler.ForwardHandler;
import com.bszn.iot.mqtt.model.PushPayload;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
/**
* @Auther: gc.x
* @Date: 2020/6/11 9:27
* @Description: 消息订阅配置
*/
@Component
@Configuration
@Slf4j
public class MqttInboundConfiguration {
@Autowired
private MqttProperties mqttProperties;
@Autowired
private ForwardHandler forwardHandler;
@Autowired
private BdForwardHandler bdForwardHandler;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* MQTT 消息订阅绑定(消费者)
* @param mqttPahoClientFactory
* @return
*/
@Bean
public MessageProducer inbound(MqttPahoClientFactory mqttPahoClientFactory) {
String[] inboundTopics = mqttProperties.getInbound().getTopics().split(",");
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInbound().getClientId(),
mqttPahoClientFactory,inboundTopics);
log.info("client:{} 绑定消费者", mqttProperties.getInbound().getClientId());
adapter.setCompletionTimeout(5000);
// 配置默认Paho消息转换器(qos=0, retain=false, charset=UTF-8)
adapter.setConverter(new DefaultPahoMessageConverter());
/*
* 设置Qos等级
* level 0:最多一次的传输
* level 1:至少一次的传输
* level 2: 有且只有一次传输 保证消息投递成功
*/
adapter.setQos(2);
//设置订阅通道
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
//ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
try{
String vendorName = getVendorName(message.getPayload().toString());
// 入口 分发不同消息
switch (vendorName){
case "hqvt":
PushPayload<JSONObject> pushPayload = JSONObject.parseObject(message.getPayload().toString(), new TypeReference<PushPayload<JSONObject>>(){});
this.forwardHandler.receive(pushPayload);
break;
case "bigdragon":
JSONObject jsonObject = JSONObject.parseObject(message.getPayload().toString());
this.bdForwardHandler.receive(jsonObject);
break;
}
}catch (Exception e){
log.error("MQTT's message parse JSONObject fail:{}", e.getMessage());
}
};
}
private String getVendorName(String message){
JSONObject jsonObject = JSONObject.parseObject(message);
if(StringUtils.isNotBlank(jsonObject.getString("operator"))){
return "hqvt";
}else if(StringUtils.isNotBlank(jsonObject.getString("Action"))){
return "bigdragon";
}
return "";
}
}
- 消息发布配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @Auther: gc.x
* @Date: 2020/6/11 9:34
* @Description: 消息发送
*/
@Configuration
@Slf4j
public class MqttOutboundConfiguration {
@Autowired
private MqttProperties mqttProperties;
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(mqttProperties.getOutbound().getUsername());
// 设置连接的密码
options.setPassword(mqttProperties.getOutbound().getPassword().toCharArray());
options.setServerURIs(mqttProperties.getOutbound().getUrl().split(","));
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
options.setMaxInflight(1000);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(), mqttClientFactory());
//BsMqttPahoMessageHandler messageHandler = new BsMqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(), mqttClientFactory());
log.info("client:{} 绑定生产者者", mqttProperties.getOutbound().getClientId());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getOutbound().getTopic());
//messageHandler.onInit();
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}