10 Spring集成MQTT(生产者与消费者)

简介: 10 Spring集成MQTT(生产者与消费者)

代码已上传至我的Github仓库:https://github.com/ylw-github/Spring-MQTT-Demo.git

整个代码的结构:

生产者(producer)

依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.ylw</groupId>
        <artifactId>spring-mqtt-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <artifactId>producer</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>
</project>

配置文件(mqtt_producer.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
       xsi:schemaLocation="
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd">
    <bean id="clientFactory"
          class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        <property name="userName" value="user1"/>
        <property name="password" value="123456"/>
        <property name="serverURIs">
            <array>
                <value>tcp://127.0.0.1:1883</value>
            </array>
        </property>
    </bean>
    <bean id="mqtt" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler">
        <constructor-arg name="clientId" value="clientId_producer"></constructor-arg>
        <constructor-arg name="clientFactory" ref="clientFactory"></constructor-arg>
    </bean>
</beans>

单元测试类(ProducerTestUnit.java):

package com.ylw;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_producer.xml")
public class ProducerTestUnit {
    @Autowired
    private MqttPahoMessageHandler mqtt;
    @Test
    public void sendTextQueue() {
        sendMqttMsg("testTopic", 2, "hello world send...");
    }
    /**
     * @param topicName 主题名字
     * @param message   发送的消息
     * @author YangLinWei
     * @date 2019/9/4  10:17
     * @qos 请求服务质量,0:至多一次,1:至少一次,2:刚好一次
     */
    private void sendMqttMsg(String topicName, int qos, String message) {
        Message<String> messages = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topicName)
                .setHeader(MqttHeaders.QOS, qos)
                .setHeader(MqttHeaders.RETAINED, true).build();
        mqtt.handleMessage(messages);
    }
}

消费者(consumer):

依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-mqtt-demo</artifactId>
        <groupId>com.ylw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>consumer</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>4.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>
</project>

配置文件(mqtt_consumer.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
       xsi:schemaLocation="
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd">
    <bean id="mqttCallback" class="com.ylw.MqttReciever"></bean>
    <bean id="options" class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
        <property name="cleanSession" value="true"></property>
        <property name="connectionTimeout" value="30"></property>
        <property name="keepAliveInterval" value="45"></property>
        <property name="userName" value="user1"></property>
        <property name="password" value="123456"></property>
    </bean>
    <bean id="client" class="org.eclipse.paho.client.mqttv3.MqttClient">
        <constructor-arg name="serverURI" value="tcp://127.0.0.1:1883"></constructor-arg>
        <constructor-arg name="clientId" value="clientId_consumer"></constructor-arg>
        <property name="callback" ref="mqttCallback"></property>
    </bean>
</beans>

单元测试类(ConsumerTestUnit.java):

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_consumer.xml")
public class ConsumerTestUnit {
    @Autowired
    MqttClient client;
    @Autowired
    private MqttConnectOptions options;
    @Test
    public void testQueue() {
        try {
            client.connect(options);
            client.subscribe("testTopic", 2);
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息监听类(MqttReciever.java):

package com.ylw;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttReciever implements MqttCallback {
    @Override
    public void connectionLost(Throwable throwable) {
    }
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Client 接收消息主题 : " + topic);
        System.out.println("Client 接收消息Qos : " + message.getQos());
        System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
|
5天前
|
存储 前端开发 Java
Spring Boot 集成 MinIO 与 KKFile 实现文件预览功能
本文详细介绍如何在Spring Boot项目中集成MinIO对象存储系统与KKFileView文件预览工具,实现文件上传及在线预览功能。首先搭建MinIO服务器,并在Spring Boot中配置MinIO SDK进行文件管理;接着通过KKFileView提供文件预览服务,最终实现文档管理系统的高效文件处理能力。
|
2月前
|
测试技术 Java Spring
Spring 框架中的测试之道:揭秘单元测试与集成测试的双重保障,你的应用真的安全了吗?
【8月更文挑战第31天】本文以问答形式深入探讨了Spring框架中的测试策略,包括单元测试与集成测试的有效编写方法,及其对提升代码质量和可靠性的重要性。通过具体示例,展示了如何使用`@MockBean`、`@SpringBootTest`等注解来进行服务和控制器的测试,同时介绍了Spring Boot提供的测试工具,如`@DataJpaTest`,以简化数据库测试流程。合理运用这些测试策略和工具,将助力开发者构建更为稳健的软件系统。
39 0
|
2月前
|
数据库 开发者 Java
颠覆传统开发:Hibernate与Spring Boot的集成,让你的开发效率飞跃式提升!
【8月更文挑战第31天】在 Java 开发中,Spring Boot 和 Hibernate 已成为许多开发者的首选技术栈。Spring Boot 简化了配置和部署过程,而 Hibernate 则是一个强大的 ORM 框架,用于管理数据库交互。将两者结合使用,可以极大提升开发效率并构建高性能的现代 Java 应用。本文将通过代码示例展示如何在 Spring Boot 项目中集成 Hibernate,并实现基本的数据库操作,包括添加依赖、配置数据源、创建实体类和仓库接口,以及在服务层和控制器中处理 HTTP 请求。这种组合不仅简化了配置,还提供了一套强大的工具来快速开发现代 Java 应用程序。
62 0
|
2月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
|
2月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
40 0
|
2月前
|
自然语言处理 安全 Java
Spring Boot中集成Lucence
本节课首先详细的分析了全文检索的理论规则,然后结合 Lucene,系统的讲述了在 Spring Boot 的集成步骤,首先快速带领大家从直观上感受 Lucene 如何建立索引已经如果检索,其次通过中文检索的具体实例,展示了 Lucene 在全文检索中的广泛应用。Lucene 不难,主要就是步骤比较多,代码不用死记硬背,拿到项目中根据实际情况做对应的修改即可。
|
2月前
|
NoSQL Java Redis
Spring Boot集成Redis全攻略:高效数据存取,打造性能飞跃的Java微服务应用!
【8月更文挑战第3天】Spring Boot是备受欢迎的微服务框架,以其快速开发与轻量特性著称。结合高性能键值数据库Redis,可显著增强应用性能。集成步骤包括:添加`spring-boot-starter-data-redis`依赖,配置Redis服务器参数,注入`RedisTemplate`或`StringRedisTemplate`进行数据操作。这种集成方案适用于缓存、高并发等场景,有效提升数据处理效率。
334 2
|
2月前
|
Java Spring
【Azure 事件中心】Spring Boot 集成 Event Hub(azure-spring-cloud-stream-binder-eventhubs)指定Partition Key有异常消息
【Azure 事件中心】Spring Boot 集成 Event Hub(azure-spring-cloud-stream-binder-eventhubs)指定Partition Key有异常消息
|
2月前
|
安全 Java 数据库
Spring Boot中集成 Shiro
本节主要介绍了 Shiro 安全框架与 Spring Boot 的整合。先介绍了 Shiro 的三大核心组件已经它们的作用;然后介绍了 Shiro 的身份认证、角色认证和权限认证;最后结合代码,详细介绍了 Spring Boot 中是如何整合 Shiro 的,并设计了一套测试流程,逐步分析 Shiro 的工作流程和原理,让读者更直观地体会出 Shiro 的整套工作流程。Shiro 使用的很广泛,希望读者将其掌握,并能运用到实际项目中。
下一篇
无影云桌面