10 Spring集成MQTT(生产者与消费者)

简介: 10 Spring集成MQTT(生产者与消费者)

代码已上传至我的Github仓库:https://github.com/ylw-github/Spring-MQTT-Demo.git

整个代码的结构:

生产者(producer)

依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.ylw</groupId>
        <artifactId>spring-mqtt-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <artifactId>producer</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>
</project>

配置文件(mqtt_producer.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
       xsi:schemaLocation="
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd">
    <bean id="clientFactory"
          class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        <property name="userName" value="user1"/>
        <property name="password" value="123456"/>
        <property name="serverURIs">
            <array>
                <value>tcp://127.0.0.1:1883</value>
            </array>
        </property>
    </bean>
    <bean id="mqtt" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler">
        <constructor-arg name="clientId" value="clientId_producer"></constructor-arg>
        <constructor-arg name="clientFactory" ref="clientFactory"></constructor-arg>
    </bean>
</beans>

单元测试类(ProducerTestUnit.java):

package com.ylw;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_producer.xml")
public class ProducerTestUnit {
    @Autowired
    private MqttPahoMessageHandler mqtt;
    @Test
    public void sendTextQueue() {
        sendMqttMsg("testTopic", 2, "hello world send...");
    }
    /**
     * @param topicName 主题名字
     * @param message   发送的消息
     * @author YangLinWei
     * @date 2019/9/4  10:17
     * @qos 请求服务质量,0:至多一次,1:至少一次,2:刚好一次
     */
    private void sendMqttMsg(String topicName, int qos, String message) {
        Message<String> messages = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topicName)
                .setHeader(MqttHeaders.QOS, qos)
                .setHeader(MqttHeaders.RETAINED, true).build();
        mqtt.handleMessage(messages);
    }
}

消费者(consumer):

依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-mqtt-demo</artifactId>
        <groupId>com.ylw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>consumer</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>
</project>

配置文件(mqtt_consumer.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
       xsi:schemaLocation="
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd">
    <bean id="mqttCallback" class="com.ylw.MqttReciever"></bean>
    <bean id="options" class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
        <property name="cleanSession" value="true"></property>
        <property name="connectionTimeout" value="30"></property>
        <property name="keepAliveInterval" value="45"></property>
        <property name="userName" value="user1"></property>
        <property name="password" value="123456"></property>
    </bean>
    <bean id="client" class="org.eclipse.paho.client.mqttv3.MqttClient">
        <constructor-arg name="serverURI" value="tcp://127.0.0.1:1883"></constructor-arg>
        <constructor-arg name="clientId" value="clientId_consumer"></constructor-arg>
        <property name="callback" ref="mqttCallback"></property>
    </bean>
</beans>

单元测试类(ConsumerTestUnit.java):

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_consumer.xml")
public class ConsumerTestUnit {
    @Autowired
    MqttClient client;
    @Autowired
    private MqttConnectOptions options;
    @Test
    public void testQueue() {
        try {
            client.connect(options);
            client.subscribe("testTopic", 2);
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息监听类(MqttReciever.java):

package com.ylw;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttReciever implements MqttCallback {
    @Override
    public void connectionLost(Throwable throwable) {
    }
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Client 接收消息主题 : " + topic);
        System.out.println("Client 接收消息Qos : " + message.getQos());
        System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
安全 Java 数据库
第16课:Spring Boot中集成 Shiro
第16课:Spring Boot中集成 Shiro
471 0
|
2月前
|
消息中间件 存储 Java
第15课: Spring Boot中集成ActiveMQ
第15课: Spring Boot中集成ActiveMQ
267 0
|
3月前
|
人工智能 Java 测试技术
Spring Boot 集成 JUnit 单元测试
本文介绍了在Spring Boot中使用JUnit 5进行单元测试的常用方法与技巧,包括添加依赖、编写测试类、使用@SpringBootTest参数、自动装配测试模块(如JSON、MVC、WebFlux、JDBC等),以及@MockBean和@SpyBean的应用。内容实用,适合Java开发者参考学习。
414 0
|
6月前
|
安全 Java Apache
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
297 0
|
6月前
|
安全 Java 数据安全/隐私保护
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
234 0
|
2月前
|
缓存 JSON 前端开发
第07课:Spring Boot集成Thymeleaf模板引擎
第07课:Spring Boot集成Thymeleaf模板引擎
355 0
第07课:Spring Boot集成Thymeleaf模板引擎
|
4月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
170 32
|
3月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
494 0
|
5月前
|
Java 开发工具 Spring
【Azure Application Insights】为Spring Boot应用集成Application Insight SDK
本文以Java Spring Boot项目为例,详细说明如何集成Azure Application Insights SDK以收集和展示日志。内容包括三步配置:1) 在`pom.xml`中添加依赖项`applicationinsights-runtime-attach`和`applicationinsights-core`;2) 在main函数中调用`ApplicationInsights.attach()`;3) 配置`applicationinsights.json`文件。同时提供问题排查建议及自定义日志方法示例,帮助用户顺利集成并使用Application Insights服务。
122 8
|
6月前
|
消息中间件 Java 微服务
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——发布/订阅消息的生产和消费
本文详细讲解了Spring Boot中ActiveMQ的发布/订阅消息机制,包括消息生产和消费的具体实现方式。生产端通过`sendMessage`方法发送订阅消息,消费端则需配置`application.yml`或自定义工厂以支持topic消息监听。为解决点对点与发布/订阅消息兼容问题,可通过设置`containerFactory`实现两者共存。最后,文章还提供了测试方法及总结,帮助读者掌握ActiveMQ在异步消息处理中的应用。
253 0

热门文章

最新文章