SpringBoot集成Mqtt

简介: 关于SpringBoot集成mqtt

一、依赖引用

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.9</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

二、配置类

包含接收消息的配置和发送消息的配置

packagecom.demo.config;
importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.integration.annotation.ServiceActivator;
importorg.springframework.integration.channel.DirectChannel;
importorg.springframework.integration.core.MessageProducer;
importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;
importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
importorg.springframework.messaging.MessageChannel;
importorg.springframework.messaging.MessageHandler;
importjava.util.UUID;
/*** mqtt连接配置*/@ConfigurationpublicclassMqttConfig {
/*** 创建连接** @return*/@BeanpublicMqttPahoClientFactorymqttClientFactory() {
DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory();
MqttConnectOptionsoptions=newMqttConnectOptions();
// mqtt用户名&密码StringuserName="";
Stringpwd="";
// mqtt服务地址,可以是多个options.setServerURIs(newString[]{"tcp://server:1883"});
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
factory.setConnectionOptions(options);
returnfactory;
    }
/*** 2、接收消息的通道*/@BeanpublicMessageChannelmqttInputChannel() {
returnnewDirectChannel();
    }
/*** 接收消息** @return*/@BeanpublicMessageProducerinbound() {
// 订阅主题,保证唯一性StringinClientId=UUID.randomUUID().toString().replaceAll("-", "");
// 最后的#相当于通配符的概念String[] topic= {"topic_prefix/topic/#"};
MqttPahoMessageDrivenChannelAdapteradapter=newMqttPahoMessageDrivenChannelAdapter(
inClientId,
mqttClientFactory(),
topic);
adapter.setCompletionTimeout(5000);
DefaultPahoMessageConverterdefaultPahoMessageConverter=newDefaultPahoMessageConverter();
// 按字节接收消息//        defaultPahoMessageConverter.setPayloadAsBytes(true);adapter.setConverter(defaultPahoMessageConverter);
// 设置QoSadapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
returnadapter;
    }
/*** 3、消息处理* ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel*/@Bean@ServiceActivator(inputChannel="mqttInputChannel")
publicMessageHandlerhandler() {
returnmessage-> {
Stringpayload=message.getPayload().toString();
// byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式Stringtopic=message.getHeaders().get("mqtt_receivedTopic").toString();
// 可以根据topic进行处理不同的业务类型System.out.println("主题["+topic+"],负载:"+payload);
        };
    }
/*** 发送消息的通道** @return*/@BeanpublicMessageChannelmqttOutboundChannel() {
returnnewDirectChannel();
    }
/*** 发送消息*/@Bean@ServiceActivator(inputChannel="mqttOutboundChannel")
publicMessageHandleroutbound() {
// 连接clientId保证唯一StringoutClientId=UUID.randomUUID().toString().replaceAll("-", "");
// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactoryMqttPahoMessageHandlermessageHandler=newMqttPahoMessageHandler(outClientId, mqttClientFactory());
// 如果设置成true,即异步,发送消息时将不会阻塞。// messageHandler.setAsync(true);// 设置默认的topic// messageHandler.setDefaultTopic("defaultTopic");// 设置默认QoSmessageHandler.setDefaultQos(1);
// Paho消息转换器DefaultPahoMessageConverterdefaultPahoMessageConverter=newDefaultPahoMessageConverter();
// 发送默认按字节类型发送消息// defaultPahoMessageConverter.setPayloadAsBytes(true);messageHandler.setConverter(defaultPahoMessageConverter);
returnmessageHandler;
    }
}

三、消息发送

1. 定义消息发送的接口

packagecom.demo.config;
importorg.springframework.integration.annotation.MessagingGateway;
importorg.springframework.integration.mqtt.support.MqttHeaders;
importorg.springframework.messaging.handler.annotation.Header;
/*** 定义消息发送的接口*/@MessagingGateway(defaultRequestChannel="mqttOutboundChannel")
publicinterfaceMqttGateWay {
/*** 发送消息** @param payload 发送的消息*/voidsendToMqtt(Stringpayload);
/*** 指定topic消息发送** @param topic   指定topic* @param payload 消息*/voidsendToMqtt(@Header(MqttHeaders.TOPIC) Stringtopic, Stringpayload);
voidsendToMqtt(@Header(MqttHeaders.TOPIC) Stringtopic, @Header(MqttHeaders.QOS) intqos, Stringpayload);
voidsendToMqtt(@Header(MqttHeaders.TOPIC) Stringtopic, @Header(MqttHeaders.QOS) intqos, byte[] payload);
}

2. 定义消息发送的controller

packagecom.demo.business;
importcom.sonli.config.MqttGateWay;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.web.bind.annotation.PostMapping;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
/*** 对外暴露发送消息的controller*/@RestController@RequestMapping("/mqtt")
publicclassMqttController {
@AutowiredprivateMqttGateWaymqttGateWay;
@PostMapping("/sendMessage")
publicStringsendMessage(Stringtopic, Stringmessage) {
// 发送消息到指定topicmqttGateWay.sendToMqtt(topic, 1, message);
return"send topic: "+topic+", message : "+message;
    }
}

3. 测试

自己发送,自己监听

3.1 发送消息

image.png

3.2 消息的监听,收到的消息

image.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
66 3
|
1月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
14天前
|
XML Java 数据库连接
SpringBoot集成Flowable:打造强大的工作流管理系统
在企业级应用开发中,工作流管理是一个核心组件,它能够帮助我们定义、执行和管理业务流程。Flowable是一个开源的工作流和业务流程管理(BPM)平台,它提供了强大的工作流引擎和建模工具。结合SpringBoot,我们可以快速构建一个高效、灵活的工作流管理系统。本文将探讨如何将Flowable集成到SpringBoot应用中,并展示其强大的功能。
55 1
|
23天前
|
JSON Java API
springboot集成ElasticSearch使用completion实现补全功能
springboot集成ElasticSearch使用completion实现补全功能
24 1
|
14天前
|
XML 存储 Java
SpringBoot集成Flowable:构建强大的工作流引擎
在企业级应用开发中,工作流管理是核心功能之一。Flowable是一个开源的工作流引擎,它提供了BPMN 2.0规范的实现,并且与SpringBoot框架完美集成。本文将探讨如何使用SpringBoot和Flowable构建一个强大的工作流引擎,并分享一些实践技巧。
39 0
|
1月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
103 1
|
1月前
|
存储 前端开发 Java
Spring Boot 集成 MinIO 与 KKFile 实现文件预览功能
本文详细介绍如何在Spring Boot项目中集成MinIO对象存储系统与KKFileView文件预览工具,实现文件上传及在线预览功能。首先搭建MinIO服务器,并在Spring Boot中配置MinIO SDK进行文件管理;接着通过KKFileView提供文件预览服务,最终实现文档管理系统的高效文件处理能力。
278 11
|
2月前
|
XML Java 关系型数据库
springboot 集成 mybatis-plus 代码生成器
本文介绍了如何在Spring Boot项目中集成MyBatis-Plus代码生成器,包括导入相关依赖坐标、配置快速代码生成器以及自定义代码生成器模板的步骤和代码示例,旨在提高开发效率,快速生成Entity、Mapper、Mapper XML、Service、Controller等代码。
springboot 集成 mybatis-plus 代码生成器
|
2月前
|
Java Spring
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决
本文介绍了如何在Spring Boot项目中集成Swagger 2.x和3.0版本,并提供了解决Swagger在Spring Boot中启动失败问题“Failed to start bean ‘documentationPluginsBootstrapper’; nested exception is java.lang.NullPointerEx”的方法,包括配置yml文件和Spring Boot版本的降级。
springboot 集成 swagger 2.x 和 3.0 以及 Failed to start bean ‘documentationPluginsBootstrapper‘问题的解决

热门文章

最新文章

下一篇
无影云桌面