代码已上传至我的Github仓库:https://github.com/ylw-github/Spring-MQTT-Demo.git
整个代码的结构:
生产者(producer)
依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.ylw</groupId> <artifactId>spring-mqtt-demo</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>producer</artifactId> <dependencies> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.3.9.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>4.3.9.RELEASE</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> </dependencies> </project>
配置文件(mqtt_producer.xml):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context" xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt" xsi:schemaLocation=" http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"> <bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory"> <property name="userName" value="user1"/> <property name="password" value="123456"/> <property name="serverURIs"> <array> <value>tcp://127.0.0.1:1883</value> </array> </property> </bean> <bean id="mqtt" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler"> <constructor-arg name="clientId" value="clientId_producer"></constructor-arg> <constructor-arg name="clientFactory" ref="clientFactory"></constructor-arg> </bean> </beans>
单元测试类(ProducerTestUnit.java):
package com.ylw; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:mqtt_producer.xml") public class ProducerTestUnit { @Autowired private MqttPahoMessageHandler mqtt; @Test public void sendTextQueue() { sendMqttMsg("testTopic", 2, "hello world send..."); } /** * @param topicName 主题名字 * @param message 发送的消息 * @author YangLinWei * @date 2019/9/4 10:17 * @qos 请求服务质量,0:至多一次,1:至少一次,2:刚好一次 */ private void sendMqttMsg(String topicName, int qos, String message) { Message<String> messages = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topicName) .setHeader(MqttHeaders.QOS, qos) .setHeader(MqttHeaders.RETAINED, true).build(); mqtt.handleMessage(messages); } }
消费者(consumer):
依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spring-mqtt-demo</artifactId> <groupId>com.ylw</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>consumer</artifactId> <dependencies> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.3.9.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>4.3.9.RELEASE</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> </dependencies> </project>
配置文件(mqtt_consumer.xml):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:context="http://www.springframework.org/schema/context" xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt" xsi:schemaLocation=" http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"> <bean id="mqttCallback" class="com.ylw.MqttReciever"></bean> <bean id="options" class="org.eclipse.paho.client.mqttv3.MqttConnectOptions"> <property name="cleanSession" value="true"></property> <property name="connectionTimeout" value="30"></property> <property name="keepAliveInterval" value="45"></property> <property name="userName" value="user1"></property> <property name="password" value="123456"></property> </bean> <bean id="client" class="org.eclipse.paho.client.mqttv3.MqttClient"> <constructor-arg name="serverURI" value="tcp://127.0.0.1:1883"></constructor-arg> <constructor-arg name="clientId" value="clientId_consumer"></constructor-arg> <property name="callback" ref="mqttCallback"></property> </bean> </beans>
单元测试类(ConsumerTestUnit.java):
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:mqtt_consumer.xml") public class ConsumerTestUnit { @Autowired MqttClient client; @Autowired private MqttConnectOptions options; @Test public void testQueue() { try { client.connect(options); client.subscribe("testTopic", 2); System.in.read(); } catch (Exception e) { e.printStackTrace(); } } }
消息监听类(MqttReciever.java):
package com.ylw; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttReciever implements MqttCallback { @Override public void connectionLost(Throwable throwable) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client 接收消息主题 : " + topic); System.out.println("Client 接收消息Qos : " + message.getQos()); System.out.println("Client 接收消息内容 : " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }