SpringBoot集成Mqtt

简介: 关于SpringBoot集成mqtt

一、依赖引用

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.9</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

二、配置类

包含接收消息的配置和发送消息的配置

packagecom.demo.config;
importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.integration.annotation.ServiceActivator;
importorg.springframework.integration.channel.DirectChannel;
importorg.springframework.integration.core.MessageProducer;
importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;
importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
importorg.springframework.messaging.MessageChannel;
importorg.springframework.messaging.MessageHandler;
importjava.util.UUID;
/*** mqtt连接配置*/@ConfigurationpublicclassMqttConfig {
/*** 创建连接** @return*/@BeanpublicMqttPahoClientFactorymqttClientFactory() {
DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory();
MqttConnectOptionsoptions=newMqttConnectOptions();
// mqtt用户名&密码StringuserName="";
Stringpwd="";
// mqtt服务地址,可以是多个options.setServerURIs(newString[]{"tcp://server:1883"});
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
factory.setConnectionOptions(options);
returnfactory;
    }
/*** 2、接收消息的通道*/@BeanpublicMessageChannelmqttInputChannel() {
returnnewDirectChannel();
    }
/*** 接收消息** @return*/@BeanpublicMessageProducerinbound() {
// 订阅主题,保证唯一性StringinClientId=UUID.randomUUID().toString().replaceAll("-", "");
// 最后的#相当于通配符的概念String[] topic= {"topic_prefix/topic/#"};
MqttPahoMessageDrivenChannelAdapteradapter=newMqttPahoMessageDrivenChannelAdapter(
inClientId,
mqttClientFactory(),
topic);
adapter.setCompletionTimeout(5000);
DefaultPahoMessageConverterdefaultPahoMessageConverter=newDefaultPahoMessageConverter();
// 按字节接收消息//        defaultPahoMessageConverter.setPayloadAsBytes(true);adapter.setConverter(defaultPahoMessageConverter);
// 设置QoSadapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
returnadapter;
    }
/*** 3、消息处理* ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel*/@Bean@ServiceActivator(inputChannel="mqttInputChannel")
publicMessageHandlerhandler() {
returnmessage-> {
Stringpayload=message.getPayload().toString();
// byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式Stringtopic=message.getHeaders().get("mqtt_receivedTopic").toString();
// 可以根据topic进行处理不同的业务类型System.out.println("主题["+topic+"],负载:"+payload);
        };
    }
/*** 发送消息的通道** @return*/@BeanpublicMessageChannelmqttOutboundChannel() {
returnnewDirectChannel();
    }
/*** 发送消息*/@Bean@ServiceActivator(inputChannel="mqttOutboundChannel")
publicMessageHandleroutbound() {
// 连接clientId保证唯一StringoutClientId=UUID.randomUUID().toString().replaceAll("-", "");
// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactoryMqttPahoMessageHandlermessageHandler=newMqttPahoMessageHandler(outClientId, mqttClientFactory());
// 如果设置成true,即异步,发送消息时将不会阻塞。// messageHandler.setAsync(true);// 设置默认的topic// messageHandler.setDefaultTopic("defaultTopic");// 设置默认QoSmessageHandler.setDefaultQos(1);
// Paho消息转换器DefaultPahoMessageConverterdefaultPahoMessageConverter=newDefaultPahoMessageConverter();
// 发送默认按字节类型发送消息// defaultPahoMessageConverter.setPayloadAsBytes(true);messageHandler.setConverter(defaultPahoMessageConverter);
returnmessageHandler;
    }
}

三、消息发送

1. 定义消息发送的接口

packagecom.demo.config;
importorg.springframework.integration.annotation.MessagingGateway;
importorg.springframework.integration.mqtt.support.MqttHeaders;
importorg.springframework.messaging.handler.annotation.Header;
/*** 定义消息发送的接口*/@MessagingGateway(defaultRequestChannel="mqttOutboundChannel")
publicinterfaceMqttGateWay {
/*** 发送消息** @param payload 发送的消息*/voidsendToMqtt(Stringpayload);
/*** 指定topic消息发送** @param topic   指定topic* @param payload 消息*/voidsendToMqtt(@Header(MqttHeaders.TOPIC) Stringtopic, Stringpayload);
voidsendToMqtt(@Header(MqttHeaders.TOPIC) Stringtopic, @Header(MqttHeaders.QOS) intqos, Stringpayload);
voidsendToMqtt(@Header(MqttHeaders.TOPIC) Stringtopic, @Header(MqttHeaders.QOS) intqos, byte[] payload);
}

2. 定义消息发送的controller

packagecom.demo.business;
importcom.sonli.config.MqttGateWay;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.web.bind.annotation.PostMapping;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
/*** 对外暴露发送消息的controller*/@RestController@RequestMapping("/mqtt")
publicclassMqttController {
@AutowiredprivateMqttGateWaymqttGateWay;
@PostMapping("/sendMessage")
publicStringsendMessage(Stringtopic, Stringmessage) {
// 发送消息到指定topicmqttGateWay.sendToMqtt(topic, 1, message);
return"send topic: "+topic+", message : "+message;
    }
}

3. 测试

自己发送,自己监听

3.1 发送消息

image.png

3.2 消息的监听,收到的消息

image.png

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
11天前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
17天前
|
NoSQL Java Redis
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
51 0
|
22天前
|
NoSQL Java Redis
SpringBoot集成Redis
SpringBoot集成Redis
159 0
|
29天前
|
NoSQL Java Redis
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
230 1
|
1月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
74 0
|
11天前
|
SQL Java 调度
SpringBoot集成quartz定时任务trigger_state状态ERROR解决办法
SpringBoot集成quartz定时任务trigger_state状态ERROR解决办法
|
18天前
|
NoSQL Java Redis
SpringBoot集成Redis
SpringBoot集成Redis
41 1
|
29天前
|
缓存 NoSQL Java
springboot中集成redis,二次封装成工具类
springboot中集成redis,二次封装成工具类
162 0
|
1月前
|
前端开发 JavaScript Java
springboot 集成easy-captcha实现图像验证码显示和登录
springboot 集成easy-captcha实现图像验证码显示和登录
135 0