10 Spring集成MQTT(生产者与消费者)

简介: 10 Spring集成MQTT(生产者与消费者)

代码已上传至我的Github仓库:https://github.com/ylw-github/Spring-MQTT-Demo.git

整个代码的结构:

生产者(producer)

依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.ylw</groupId>
        <artifactId>spring-mqtt-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <artifactId>producer</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>
</project>

配置文件(mqtt_producer.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
       xsi:schemaLocation="
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd">
    <bean id="clientFactory"
          class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        <property name="userName" value="user1"/>
        <property name="password" value="123456"/>
        <property name="serverURIs">
            <array>
                <value>tcp://127.0.0.1:1883</value>
            </array>
        </property>
    </bean>
    <bean id="mqtt" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler">
        <constructor-arg name="clientId" value="clientId_producer"></constructor-arg>
        <constructor-arg name="clientFactory" ref="clientFactory"></constructor-arg>
    </bean>
</beans>

单元测试类(ProducerTestUnit.java):

package com.ylw;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_producer.xml")
public class ProducerTestUnit {
    @Autowired
    private MqttPahoMessageHandler mqtt;
    @Test
    public void sendTextQueue() {
        sendMqttMsg("testTopic", 2, "hello world send...");
    }
    /**
     * @param topicName 主题名字
     * @param message   发送的消息
     * @author YangLinWei
     * @date 2019/9/4  10:17
     * @qos 请求服务质量,0:至多一次,1:至少一次,2:刚好一次
     */
    private void sendMqttMsg(String topicName, int qos, String message) {
        Message<String> messages = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topicName)
                .setHeader(MqttHeaders.QOS, qos)
                .setHeader(MqttHeaders.RETAINED, true).build();
        mqtt.handleMessage(messages);
    }
}

消费者(consumer):

依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-mqtt-demo</artifactId>
        <groupId>com.ylw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>consumer</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>
</project>

配置文件(mqtt_consumer.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
       xsi:schemaLocation="
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd">
    <bean id="mqttCallback" class="com.ylw.MqttReciever"></bean>
    <bean id="options" class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
        <property name="cleanSession" value="true"></property>
        <property name="connectionTimeout" value="30"></property>
        <property name="keepAliveInterval" value="45"></property>
        <property name="userName" value="user1"></property>
        <property name="password" value="123456"></property>
    </bean>
    <bean id="client" class="org.eclipse.paho.client.mqttv3.MqttClient">
        <constructor-arg name="serverURI" value="tcp://127.0.0.1:1883"></constructor-arg>
        <constructor-arg name="clientId" value="clientId_consumer"></constructor-arg>
        <property name="callback" ref="mqttCallback"></property>
    </bean>
</beans>

单元测试类(ConsumerTestUnit.java):

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_consumer.xml")
public class ConsumerTestUnit {
    @Autowired
    MqttClient client;
    @Autowired
    private MqttConnectOptions options;
    @Test
    public void testQueue() {
        try {
            client.connect(options);
            client.subscribe("testTopic", 2);
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息监听类(MqttReciever.java):

package com.ylw;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttReciever implements MqttCallback {
    @Override
    public void connectionLost(Throwable throwable) {
    }
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Client 接收消息主题 : " + topic);
        System.out.println("Client 接收消息Qos : " + message.getQos());
        System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
2天前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
23 8
|
1月前
|
XML Java API
Spring Boot集成MinIO
本文介绍了如何在Spring Boot项目中集成MinIO,一个高性能的分布式对象存储服务。主要步骤包括:引入MinIO依赖、配置MinIO属性、创建MinIO配置类和服务类、使用服务类实现文件上传和下载功能,以及运行应用进行测试。通过这些步骤,可以轻松地在项目中使用MinIO的对象存储功能。
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
38 6
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
76 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
60 1
|
2月前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
61 0
|
3月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
331 1
|
3月前
|
Java Spring
springboot 学习十一:Spring Boot 优雅的集成 Lombok
这篇文章是关于如何在Spring Boot项目中集成Lombok,以简化JavaBean的编写,避免冗余代码,并提供了相关的配置步骤和常用注解的介绍。
145 0
|
3月前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
268 2
下一篇
开通oss服务