springboot集成mqtt

简介: springboot集成mqtt

springboot集成mqtt

1. 前言

这里我们使用springboot搭建一个轻量级的mqtt客户端,连接mqtt的Broker服务。

连接信息写在配置文件里application.properties

spring.mqtt.username=admin
spring.mqtt.mqpassword=admin
spring.mqtt.host-url= tcp://127.0.0.1:1883
spring.mqtt.client-id= server_client_${random.value}
spring.mqtt.default-topic= $SYS/brokers/+/clients/#
spring.mqtt.completionTimeout= 3000
spring.mqtt.keepAlive= 60

2. 引入依赖

       <!--mqtt -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

3. 配置文件

新建MqttProperties.java文件,初始化application里的mqtt配置项

/**
 * @author Eric
 * @date 2020年5月14日
 */
@ConfigurationProperties("spring.mqtt")
@Component
@Getter
@Setter
public class MqttProperties {
    private String username;
    private String mqpassword;
    private String hostUrl;
    private String clientId;
    private String defaultTopic;
    private String completionTimeout;
    private Integer keepAlive;
}

新建MqttConfiguration.java文件,为mqtt做初始化配置

/**
 * @author Eric
 * @date 2020年5月14日
 */
@Configuration
@Slf4j
public class MqttConfiguration {
    @Autowired
    private MqttProperties mqttProperties;
    /**
     * 事件触发
     */
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    @Bean
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()});
        mqttConnectOptions.setKeepAliveInterval(2);
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
        return mqttConnectOptions;
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    /**
     * 配置client,监听的topic
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(),
                        mqttProperties.getDefaultTopic().split(","));
        adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout()));
        adapter.setConverter(new DefaultPahoMessageConverter());
        //默认添加TopicName中所有tipic
        adapter.addTopic("+/+/test");
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String qos = message.getHeaders().get("mqtt_receivedQos").toString();
                //触发事件 这里不再做业务处理,包 listener中做处理
                eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString()));
            }
        };
    }
    /**
     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        // 在这里进行mqttOutboundChannel的相关设置
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory());
        // 如果设置成true,发送消息时将不会阻塞。
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
        return messageHandler;
    }
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

4. MQTT消息类

新建MqttEvent.java 消息类。用于发送mqtt的消息

/**
 *  触发event topic 事件
 * @author Eric
 * @date 2020年5月23日
 */
@Slf4j
@Component
public class JobListener {
    @Autowired
    DeviceDao deviceDao;
    /**
     * 监听topic
     * @param mqttEvent
     */
    @EventListener(condition = "#mqttEvent.topic.startsWith('pay')")
    public void onEmqttCall1(MqttEvent mqttEvent) throws Exception {
        String topic = mqttEvent.getTopic();
    //写逻辑处理
    }
    /**
     * 监听topic
     * @param mqttEvent
     */
    @EventListener(condition = "#mqttEvent.topic.equals('device')")
    public void onEmqttCallT(MqttEvent mqttEvent){
        log.info("接收到消11111111111:"+mqttEvent.getMessage());
    }
}

6. MQTT消息发送器

新建MqttGateway.java 提供发送mqttt消息的接口服务

/**
 *  触发event topic 事件
 * @author Eric
 * @date 2020年5月23日
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(String data);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

7. 测试MQTT发送消息

/**
 * @version 1.0
 * @author: eric
 * @date: 2022/7/1 上午 11:03
 */
@SpringBootTest
public class Test3 {
    @Autowired
    MqttGateway mqttGateway;
    @Test
    public void mqttTest () {
      mqttGateway.sendToMqtt("111//222/33","消息内容");
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
60 3
|
24天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
4天前
|
XML Java 数据库连接
SpringBoot集成Flowable:打造强大的工作流管理系统
在企业级应用开发中,工作流管理是一个核心组件,它能够帮助我们定义、执行和管理业务流程。Flowable是一个开源的工作流和业务流程管理(BPM)平台,它提供了强大的工作流引擎和建模工具。结合SpringBoot,我们可以快速构建一个高效、灵活的工作流管理系统。本文将探讨如何将Flowable集成到SpringBoot应用中,并展示其强大的功能。
19 1
|
13天前
|
JSON Java API
springboot集成ElasticSearch使用completion实现补全功能
springboot集成ElasticSearch使用completion实现补全功能
19 1
|
4天前
|
XML 存储 Java
SpringBoot集成Flowable:构建强大的工作流引擎
在企业级应用开发中,工作流管理是核心功能之一。Flowable是一个开源的工作流引擎,它提供了BPMN 2.0规范的实现,并且与SpringBoot框架完美集成。本文将探讨如何使用SpringBoot和Flowable构建一个强大的工作流引擎,并分享一些实践技巧。
15 0
|
29天前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
49 1
|
1月前
|
存储 前端开发 Java
Spring Boot 集成 MinIO 与 KKFile 实现文件预览功能
本文详细介绍如何在Spring Boot项目中集成MinIO对象存储系统与KKFileView文件预览工具,实现文件上传及在线预览功能。首先搭建MinIO服务器,并在Spring Boot中配置MinIO SDK进行文件管理;接着通过KKFileView提供文件预览服务,最终实现文档管理系统的高效文件处理能力。
241 11
|
2月前
|
XML Java 关系型数据库
springboot 集成 mybatis-plus 代码生成器
本文介绍了如何在Spring Boot项目中集成MyBatis-Plus代码生成器,包括导入相关依赖坐标、配置快速代码生成器以及自定义代码生成器模板的步骤和代码示例,旨在提高开发效率,快速生成Entity、Mapper、Mapper XML、Service、Controller等代码。
springboot 集成 mybatis-plus 代码生成器
|
2月前
|
Java Spring
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决
本文介绍了如何在Spring Boot项目中集成Swagger 2.x和3.0版本,并提供了解决Swagger在Spring Boot中启动失败问题“Failed to start bean ‘documentationPluginsBootstrapper’; nested exception is java.lang.NullPointerEx”的方法,包括配置yml文件和Spring Boot版本的降级。
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决
下一篇
无影云桌面