springboot集成mqtt
1. 前言
这里我们使用springboot搭建一个轻量级的mqtt客户端,连接mqtt的Broker服务。
连接信息写在配置文件里application.properties
spring.mqtt.username=admin spring.mqtt.mqpassword=admin spring.mqtt.host-url= tcp://127.0.0.1:1883 spring.mqtt.client-id= server_client_${random.value} spring.mqtt.default-topic= $SYS/brokers/+/clients/# spring.mqtt.completionTimeout= 3000 spring.mqtt.keepAlive= 60
2. 引入依赖
<!--mqtt --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
3. 配置文件
新建MqttProperties.java文件,初始化application里的mqtt配置项
/** * @author Eric * @date 2020年5月14日 */ @ConfigurationProperties("spring.mqtt") @Component @Getter @Setter public class MqttProperties { private String username; private String mqpassword; private String hostUrl; private String clientId; private String defaultTopic; private String completionTimeout; private Integer keepAlive; }
新建MqttConfiguration.java文件,为mqtt做初始化配置
/** * @author Eric * @date 2020年5月14日 */ @Configuration @Slf4j public class MqttConfiguration { @Autowired private MqttProperties mqttProperties; /** * 事件触发 */ @Autowired private ApplicationEventPublisher eventPublisher; @Bean public MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray()); mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()}); mqttConnectOptions.setKeepAliveInterval(2); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive()); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置client,监听的topic */ @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(), mqttProperties.getDefaultTopic().split(",")); adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout())); adapter.setConverter(new DefaultPahoMessageConverter()); //默认添加TopicName中所有tipic adapter.addTopic("+/+/test"); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String qos = message.getHeaders().get("mqtt_receivedQos").toString(); //触发事件 这里不再做业务处理,包 listener中做处理 eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString())); } }; } /** * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory * * @return */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { // 在这里进行mqttOutboundChannel的相关设置 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory()); // 如果设置成true,发送消息时将不会阻塞。 messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
4. MQTT消息类
新建MqttEvent.java 消息类。用于发送mqtt的消息
/** * 触发event topic 事件 * @author Eric * @date 2020年5月23日 */ @Slf4j @Component public class JobListener { @Autowired DeviceDao deviceDao; /** * 监听topic * @param mqttEvent */ @EventListener(condition = "#mqttEvent.topic.startsWith('pay')") public void onEmqttCall1(MqttEvent mqttEvent) throws Exception { String topic = mqttEvent.getTopic(); //写逻辑处理 } /** * 监听topic * @param mqttEvent */ @EventListener(condition = "#mqttEvent.topic.equals('device')") public void onEmqttCallT(MqttEvent mqttEvent){ log.info("接收到消11111111111:"+mqttEvent.getMessage()); } }
6. MQTT消息发送器
新建MqttGateway.java 提供发送mqttt消息的接口服务
/** * 触发event topic 事件 * @author Eric * @date 2020年5月23日 */ @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
7. 测试MQTT发送消息
/** * @version 1.0 * @author: eric * @date: 2022/7/1 上午 11:03 */ @SpringBootTest public class Test3 { @Autowired MqttGateway mqttGateway; @Test public void mqttTest () { mqttGateway.sendToMqtt("111//222/33","消息内容"); } }