一、依赖引用
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.9</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
二、配置类
包含接收消息的配置和发送消息的配置
packagecom.demo.config; importorg.eclipse.paho.client.mqttv3.MqttConnectOptions; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; importorg.springframework.integration.annotation.ServiceActivator; importorg.springframework.integration.channel.DirectChannel; importorg.springframework.integration.core.MessageProducer; importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; importorg.springframework.integration.mqtt.core.MqttPahoClientFactory; importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter; importorg.springframework.messaging.MessageChannel; importorg.springframework.messaging.MessageHandler; importjava.util.UUID; /*** mqtt连接配置*/publicclassMqttConfig { /*** 创建连接** @return*/publicMqttPahoClientFactorymqttClientFactory() { DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory(); MqttConnectOptionsoptions=newMqttConnectOptions(); // mqtt用户名&密码StringuserName=""; Stringpwd=""; // mqtt服务地址,可以是多个options.setServerURIs(newString[]{"tcp://server:1883"}); options.setUserName(userName); options.setPassword(pwd.toCharArray()); factory.setConnectionOptions(options); returnfactory; } /*** 2、接收消息的通道*/publicMessageChannelmqttInputChannel() { returnnewDirectChannel(); } /*** 接收消息** @return*/publicMessageProducerinbound() { // 订阅主题,保证唯一性StringinClientId=UUID.randomUUID().toString().replaceAll("-", ""); // 最后的#相当于通配符的概念String[] topic= {"topic_prefix/topic/#"}; MqttPahoMessageDrivenChannelAdapteradapter=newMqttPahoMessageDrivenChannelAdapter( inClientId, mqttClientFactory(), topic); adapter.setCompletionTimeout(5000); DefaultPahoMessageConverterdefaultPahoMessageConverter=newDefaultPahoMessageConverter(); // 按字节接收消息// defaultPahoMessageConverter.setPayloadAsBytes(true);adapter.setConverter(defaultPahoMessageConverter); // 设置QoSadapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); returnadapter; } /*** 3、消息处理* ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel*/inputChannel="mqttInputChannel") (publicMessageHandlerhandler() { returnmessage-> { Stringpayload=message.getPayload().toString(); // byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式Stringtopic=message.getHeaders().get("mqtt_receivedTopic").toString(); // 可以根据topic进行处理不同的业务类型System.out.println("主题["+topic+"],负载:"+payload); }; } /*** 发送消息的通道** @return*/publicMessageChannelmqttOutboundChannel() { returnnewDirectChannel(); } /*** 发送消息*/inputChannel="mqttOutboundChannel") (publicMessageHandleroutbound() { // 连接clientId保证唯一StringoutClientId=UUID.randomUUID().toString().replaceAll("-", ""); // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactoryMqttPahoMessageHandlermessageHandler=newMqttPahoMessageHandler(outClientId, mqttClientFactory()); // 如果设置成true,即异步,发送消息时将不会阻塞。// messageHandler.setAsync(true);// 设置默认的topic// messageHandler.setDefaultTopic("defaultTopic");// 设置默认QoSmessageHandler.setDefaultQos(1); // Paho消息转换器DefaultPahoMessageConverterdefaultPahoMessageConverter=newDefaultPahoMessageConverter(); // 发送默认按字节类型发送消息// defaultPahoMessageConverter.setPayloadAsBytes(true);messageHandler.setConverter(defaultPahoMessageConverter); returnmessageHandler; } }
三、消息发送
1. 定义消息发送的接口
packagecom.demo.config; importorg.springframework.integration.annotation.MessagingGateway; importorg.springframework.integration.mqtt.support.MqttHeaders; importorg.springframework.messaging.handler.annotation.Header; /*** 定义消息发送的接口*/defaultRequestChannel="mqttOutboundChannel") (publicinterfaceMqttGateWay { /*** 发送消息** @param payload 发送的消息*/voidsendToMqtt(Stringpayload); /*** 指定topic消息发送** @param topic 指定topic* @param payload 消息*/voidsendToMqtt( (MqttHeaders.TOPIC) Stringtopic, Stringpayload); voidsendToMqtt( (MqttHeaders.TOPIC) Stringtopic, (MqttHeaders.QOS) intqos, Stringpayload); voidsendToMqtt( (MqttHeaders.TOPIC) Stringtopic, (MqttHeaders.QOS) intqos, byte[] payload); }
2. 定义消息发送的controller
packagecom.demo.business; importcom.sonli.config.MqttGateWay; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.web.bind.annotation.PostMapping; importorg.springframework.web.bind.annotation.RequestMapping; importorg.springframework.web.bind.annotation.RestController; /*** 对外暴露发送消息的controller*/"/mqtt") (publicclassMqttController { privateMqttGateWaymqttGateWay; "/sendMessage") (publicStringsendMessage(Stringtopic, Stringmessage) { // 发送消息到指定topicmqttGateWay.sendToMqtt(topic, 1, message); return"send topic: "+topic+", message : "+message; } }
3. 测试
自己发送,自己监听
3.1 发送消息
3.2 消息的监听,收到的消息