SpringBoot集成Mqtt

简介: 关于SpringBoot集成mqtt

一、依赖引用

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.9</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

二、配置类

包含接收消息的配置和发送消息的配置

packagecom.demo.config;
importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.integration.annotation.ServiceActivator;
importorg.springframework.integration.channel.DirectChannel;
importorg.springframework.integration.core.MessageProducer;
importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;
importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
importorg.springframework.messaging.MessageChannel;
importorg.springframework.messaging.MessageHandler;
importjava.util.UUID;
/*** mqtt连接配置*/@ConfigurationpublicclassMqttConfig {
/*** 创建连接** @return*/@BeanpublicMqttPahoClientFactorymqttClientFactory() {
DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory();
MqttConnectOptionsoptions=newMqttConnectOptions();
// mqtt用户名&密码StringuserName="";
Stringpwd="";
// mqtt服务地址,可以是多个options.setServerURIs(newString[]{"tcp://server:1883"});
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
factory.setConnectionOptions(options);
returnfactory;
    }
/*** 2、接收消息的通道*/@BeanpublicMessageChannelmqttInputChannel() {
returnnewDirectChannel();
    }
/*** 接收消息** @return*/@BeanpublicMessageProducerinbound() {
// 订阅主题,保证唯一性StringinClientId=UUID.randomUUID().toString().replaceAll("-", "");
// 最后的#相当于通配符的概念String[] topic= {"topic_prefix/topic/#"};
MqttPahoMessageDrivenChannelAdapteradapter=newMqttPahoMessageDrivenChannelAdapter(
inClientId,
mqttClientFactory(),
topic);
adapter.setCompletionTimeout(5000);
DefaultPahoMessageConverterdefaultPahoMessageConverter=newDefaultPahoMessageConverter();
// 按字节接收消息//        defaultPahoMessageConverter.setPayloadAsBytes(true);adapter.setConverter(defaultPahoMessageConverter);
// 设置QoSadapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
returnadapter;
    }
/*** 3、消息处理* ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel*/@Bean@ServiceActivator(inputChannel="mqttInputChannel")
publicMessageHandlerhandler() {
returnmessage-> {
Stringpayload=message.getPayload().toString();
// byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式Stringtopic=message.getHeaders().get("mqtt_receivedTopic").toString();
// 可以根据topic进行处理不同的业务类型System.out.println("主题["+topic+"],负载:"+payload);
        };
    }
/*** 发送消息的通道** @return*/@BeanpublicMessageChannelmqttOutboundChannel() {
returnnewDirectChannel();
    }
/*** 发送消息*/@Bean@ServiceActivator(inputChannel="mqttOutboundChannel")
publicMessageHandleroutbound() {
// 连接clientId保证唯一StringoutClientId=UUID.randomUUID().toString().replaceAll("-", "");
// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactoryMqttPahoMessageHandlermessageHandler=newMqttPahoMessageHandler(outClientId, mqttClientFactory());
// 如果设置成true,即异步,发送消息时将不会阻塞。// messageHandler.setAsync(true);// 设置默认的topic// messageHandler.setDefaultTopic("defaultTopic");// 设置默认QoSmessageHandler.setDefaultQos(1);
// Paho消息转换器DefaultPahoMessageConverterdefaultPahoMessageConverter=newDefaultPahoMessageConverter();
// 发送默认按字节类型发送消息// defaultPahoMessageConverter.setPayloadAsBytes(true);messageHandler.setConverter(defaultPahoMessageConverter);
returnmessageHandler;
    }
}

三、消息发送

1. 定义消息发送的接口

packagecom.demo.config;
importorg.springframework.integration.annotation.MessagingGateway;
importorg.springframework.integration.mqtt.support.MqttHeaders;
importorg.springframework.messaging.handler.annotation.Header;
/*** 定义消息发送的接口*/@MessagingGateway(defaultRequestChannel="mqttOutboundChannel")
publicinterfaceMqttGateWay {
/*** 发送消息** @param payload 发送的消息*/voidsendToMqtt(Stringpayload);
/*** 指定topic消息发送** @param topic   指定topic* @param payload 消息*/voidsendToMqtt(@Header(MqttHeaders.TOPIC) Stringtopic, Stringpayload);
voidsendToMqtt(@Header(MqttHeaders.TOPIC) Stringtopic, @Header(MqttHeaders.QOS) intqos, Stringpayload);
voidsendToMqtt(@Header(MqttHeaders.TOPIC) Stringtopic, @Header(MqttHeaders.QOS) intqos, byte[] payload);
}

2. 定义消息发送的controller

packagecom.demo.business;
importcom.sonli.config.MqttGateWay;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.web.bind.annotation.PostMapping;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
/*** 对外暴露发送消息的controller*/@RestController@RequestMapping("/mqtt")
publicclassMqttController {
@AutowiredprivateMqttGateWaymqttGateWay;
@PostMapping("/sendMessage")
publicStringsendMessage(Stringtopic, Stringmessage) {
// 发送消息到指定topicmqttGateWay.sendToMqtt(topic, 1, message);
return"send topic: "+topic+", message : "+message;
    }
}

3. 测试

自己发送,自己监听

3.1 发送消息

image.png

3.2 消息的监听,收到的消息

image.png

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
2134 1
|
7月前
|
JSON 分布式计算 大数据
springboot项目集成大数据第三方dolphinscheduler调度器
springboot项目集成大数据第三方dolphinscheduler调度器
452 3
|
7月前
|
缓存 JSON 前端开发
第07课:Spring Boot集成Thymeleaf模板引擎
第07课:Spring Boot集成Thymeleaf模板引擎
686 0
第07课:Spring Boot集成Thymeleaf模板引擎
|
7月前
|
Java 关系型数据库 MySQL
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
751 2
|
7月前
|
分布式计算 Java 大数据
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
417 2
|
7月前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
1816 5
|
分布式计算 大数据 Java
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
174 0
|
7月前
|
存储 人工智能 Java
Springboot集成AI Springboot3 集成阿里云百炼大模型CosyVoice2 实现Ai克隆语音(未持久化存储)
本项目基于Spring Boot 3.5.3与Java 17,集成阿里云百炼大模型CosyVoice2实现音色克隆与语音合成。内容涵盖项目搭建、音色创建、音频合成、音色管理等功能,适用于希望快速掌握Spring Boot集成语音AI技术的开发者。需提前注册阿里云并获取API Key。