基于RabbitMQ的MQTT实现

简介: 基于RabbitMQ的MQTT实现

1.RabbitMQ mqtt协议开启

默认情况下RabbitMQ是不开启MQTT协议的,所以需要我们手动的开启相关的插件,而RabbitMQ的MQTT协议分为两种。

  • rabbitmq_mqtt 提供与后端服务交互使用,对应端口1883
  • rabbitmq_web_mqtt 提供与前端交互使用,对应端口15675

打开cmd窗口,进入RabbitMQ的sbin目录

开启rabbitmq_mqtt协议

rabbitmq-plugins enable rabbitmq_mqtt

开启rabbitmq_web_mqtt协议

rabbitmq-plugins enable rabbitmq_web_mqtt

重启RabbitMQ后,登录RabbitMQ管理后台

http://127.0.0.1:15672

3.mqtt相关概念:

  • Publisher(发布者):消息的发出者,负责生产数据。发布者发送某个主题的数据给经纪人,发布者不知道订阅者。
  • Subscriber(订阅者):消息的订阅者,订阅经纪人管理的某个或者某几个主题。
  • Broker(经纪人):当经纪人接收到某个主题的数据时,将数据发送给这个主题的所有订阅者。
  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
  • Payload(负载);可以理解为发送消息的内容。
  • QoS(消息质量):全称 Quality of Service,即消息的发送质量,主要有 QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:
    (1) QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
    (2) QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
    (3) QoS 2(Exactly Once):只有一次,确保消息只到达一次。

3.Spring整合mqtt

  • 创建项目

pom.xml文件引入如下依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.olive</groupId>
  <artifactId>rabbitmq-mqtt-demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.7</version>
    <relativePath />
  </parent>
  <properties>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.1</version>
    </dependency>
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>
  • mqtt连接配置类
package com.olive.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@Configuration
public class MqttConfig {
  private static String servers[] = { "tcp://127.0.0.1:1883" };
  private static String username = "admin";
  private static String password = "admin123";
  @Bean
  public MqttConnectOptions getMqttConnectOptions() {
    MqttConnectOptions options = new MqttConnectOptions();
    // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
    // 这里设置为true表示每次连接到服务器都以新的身份连接
    options.setCleanSession(true);
    // 设置连接的用户名
    options.setUserName(username);
    // 设置连接的密码
    options.setPassword(password.toCharArray());
    options.setServerURIs(servers);
    // 设置超时时间 单位为秒
    options.setConnectionTimeout(10);
    // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
    options.setKeepAliveInterval(20);
    // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
    //options.setWill("willTopic", WILL_DATA, 2, false);
    return options;
  }
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOptions());
    return factory;
  }
}
  • 消息生产者配置类
package com.olive.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttProducerConfig {
  public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
  private static String clientId = "test_mqtt/producer";
  private static String topic = "test_mqtt_topic";
  @Autowired
  MqttPahoClientFactory mqttClientFactory;
  /**
   * MQTT信息通道(生产者)
   */
  @Bean(name = CHANNEL_NAME_OUT)
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }
  /**
   * MQTT消息处理器(生产者)
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory);
    messageHandler.setAsync(false);
    messageHandler.setDefaultTopic(topic);
    return messageHandler;
  }
}
  • 消息监听器配置类
package com.olive.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@Configuration
public class MqttListener {
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
private static String clientId = "test_mqtt/consumer";
private static String listenTopic = "test_mqtt_topic";
@Autowired
    MqttPahoClientFactory mqttClientFactory;
/**
     * MQTT消息通道(消费者)
     */
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
    }
/**
     * MQTT消息订阅绑定(消费者)
     */
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,
            mqttClientFactory, 
            listenTopic);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
//设置消息质量:0->至多一次;1->至少一次;2->只有一次
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInboundChannel());
return adapter;
    }
/**
     * MQTT消息监听器(消费者)
     * MessageHandler: org.springframework:spring-messaging
     */
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handlerMessage() {
return message -> {
try {
MessageHeaders messageHeaders = message.getHeaders();
                 System.out.println("messageHeaders>>" + messageHeaders);
String string = message.getPayload().toString();
                System.out.println("接收到消息:" + string);
            } catch (MessagingException e) {
                e.printStackTrace();
            }
        };
    }
}

handlerMessage()方法也可以独立出来,如下

package com.olive.handler;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class CustomMessageHandler implements MessageHandler {
  @ServiceActivator(inputChannel = MqttListener.CHANNEL_NAME_IN)
  @Override
  public void handleMessage(Message<?> message) throws MessagingException {
    try {
      MessageHeaders messageHeaders = message.getHeaders();
      System.out.println("messageHeaders>>" + messageHeaders);
      String string = message.getPayload().toString();
      System.out.println("接收到消息:" + string);
    } catch (MessagingException e) {
      e.printStackTrace();
    }
  }
}
  • 消息发送服务
package com.olive.service;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import com.olive.config.MqttProducerConfig;
@Component
@MessagingGateway(defaultRequestChannel = MqttProducerConfig.CHANNEL_NAME_OUT)
public interface MqttSender {
  /**
   * 发送信息到MQTT服务器
   *
   * @param data 发送的文本
   */
  void sendToMqtt(String data);
  /**
   * 发送信息到MQTT服务器
   *
   * @param topic   主题
   * @param payload 消息主体
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
  /**
   * 发送信息到MQTT服务器
   *
   * qos: 0 至多一次,数据可能丢失 1 至少一次,数据可能重复 2 只有一次,且仅有一次,最耗性能
   *
   * @param topic   主题
   * @param qos     服务质量
   * @param payload 消息主体
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, 
      String payload);
}
  • 验证

访问如下接口发送数据

http://127.0.0.1:8080/mqtt?msg=mqttmessage

http://127.0.0.1:8080/mqtt2?msg=mqttmessage

package com.olive.controller;
import javax.annotation.Resource;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.olive.service.MqttSender;
@RestController
public class MqttController {
@Resource
private MqttSender mqttSender;
/**
     * 发送MQTT消息
     */
@RequestMapping(value = "/mqtt", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) {
        System.out.println("生产MQTT消息: " + message);
        mqttSender.sendToMqtt(message);
return new ResponseEntity<>("OK", HttpStatus.OK);
    }
/**
     * 发送MQTT消息
     */
@RequestMapping(value = "/mqtt2", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> sendMqtt2(@RequestParam(value = "msg") String message) {
        System.out.println("生产MQTT消息:" + message);
        mqttSender.sendToMqtt("test_mqtt_topic", message);
return new ResponseEntity<>("OK", HttpStatus.OK);
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
79 3
|
2月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
6月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
119 1
|
3月前
|
消息中间件 数据采集 数据库
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
40 1
|
4月前
|
消息中间件 监控 物联网
MQTT协议对接及RabbitMQ的使用记录
通过合理对接MQTT协议并利用RabbitMQ的强大功能,可以构建一个高效、可靠的消息通信系统。无论是物联网设备间的通信还是微服务架构下的服务间消息传递,MQTT和RabbitMQ的组合都提供了一个强有力的解决方案。在实际应用中,应根据具体需求和环境进行适当的配置和优化,以发挥出这两个技术的最大效能。
233 0
|
5月前
|
消息中间件 存储 监控
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别,设计目标、适用场景、吞吐量、消息存储和持久化、可靠性、集群负载均衡
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
|
6月前
|
消息中间件 API 开发工具
消息队列 MQ使用问题之如何开启RabbitMQ的MQTT功能
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 小程序 RocketMQ
消息队列 MQ使用问题之如何在小程序中引用paho-mqtt
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 RocketMQ
消息队列 MQ使用问题之如何使用SockJS和Stomp与RabbitMQ建立连接
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。