Java原生结合MQTTX,完成心跳对话

简介: 简介:本文带你用Java结合MQTT协议与EMQX服务器,在Ubuntu上实现两个程序的“隔空传话”。通过搭建消息代理、编写发送/接收代码,让Java应用实现实时通信,附完整源码与调试技巧,轻松掌握物联网通信核心技能。✨

简言:当Java遇上MQTT:打造会"隔空传话"的魔法程序

导语:想不想让两个Java程序像哈利波特里的双面镜一样实时对话?今天我们将用MQTT协议+EMQX,在Ubuntu上搭建一个魔法邮局,再亲手编写会传信的Java程序!gitCode平台附赠【案例源码】🔥


源码地址:https://blog.csdn.net/huangzhe0701/article/details/145205822

参考文档:

  1. 在 Ubuntu 上安装 EMQX:https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
  2. MQTTX 下载:https://mqttx.app/zh/downloads

一、🛠️ 搭建魔法邮局(EMQX服务器)

扩展:在安装EMQX前记得先更新先软件包

apt update

1. 安装EMQX企业版
在Ubuntu终端输入以下咒语:

# 下载魔法卷轴(安装包)
wget https://www.emqx.com/zh/downloads/enterprise/5.9.0/emqx-enterprise-5.9.0-ubuntu24.04-amd64.deb

# 解开卷轴封印
sudo dpkg -i emqx-enterprise-5.9.0-ubuntu20.04-amd64.deb

# 启动邮局服务
sudo systemctl start emqx

2. 打开魔法管理台
浏览器访问 http://localhost:18083,默认账号admin/public,你将看到:


二、📱 准备第一个信使(MQTTX客户端)

安装MQTTX桌面版

安装地址:https://mqttx.app/zh/downloads

打开后新建连接:

  • 名称:魔法邮箱_varin.cn
  • 服务器:varin:1883


三、 编写会魔法的Java程序

1. 添加咒语依赖(Maven)

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

2. MQTT连接核心代码

package cn.varin;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test;
public class MqttConnectionTest {
    /**
     *
     *
     * String serverURI,(mqtt服务端地址)
     * String clientId,(客户端id)
     * MqttClientPersistence persistence(内存持久类)
     *
     *
     */
    public static String serviceURL= "tcp://varin.cn:1883";
    public static String clientId="varya_test_01";
    public static MqttClient client;
    String user="varya";
    String password= "123456";
    static {
        try {
            // 建立一个mqqt客户端类
            client = new MqttClient(serviceURL,clientId,new MemoryPersistence());
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
    // 创建mqtt连接
    @Test
    public void createConnectionTest() throws MqttException {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(user);
        mqttConnectOptions.setPassword(password.toCharArray());
        client.connect(mqttConnectOptions);
//        注意:因为用test类方法执行的话,太快,可能看不出来,是否正真的建立的连接,所以添加一个死循环来保持程序的存在。
        while (true);
    }
}

2. MQTT发送消息

package cn.varin;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test;
public class MqttSendMessageTest {
    /**
     *
     *
     * String serverURI,(mqtt服务端地址)
     * String clientId,(客户端id)
     * MqttClientPersistence persistence(内存持久类)
     *
     *
     */
    public static String serviceURL= "tcp://varin.cn:1883";
    public static String clientId="varya_test_01";
    public static MqttClient client;
    public static String user="varya";
   public static String password= "123456";
    static {
        try {
            // 建立一个mqqt客户端类
            client = new MqttClient(serviceURL,clientId,new MemoryPersistence());
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName(user);
            mqttConnectOptions.setPassword(password.toCharArray());
            client.connect(mqttConnectOptions);
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
    // 发送消息
    @Test
    public void SendMessageTest() throws MqttException {
        // 设置消息
        String message = "hello mqttx Client";
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(2);
        // 当一个主题的消息设置了 setRetained(true) 后,这条消息会存储在 Broker 中。如果后续有新的客户端订阅这个主题,则无论何时订阅,都会立即收到最近的一条带有 retained 属性的消息作为初始数据2。如果没有设置 retained 或者之前的消息未被保留,则新订阅者不会接收到任何历史消息。,
        mqttMessage.setRetained(true);
        // 发送消息
        client.publish("java_and_mqttx_conn", mqttMessage);
        //发送完,关闭连接
        client.disconnect();
        client.close();
    }
}

3. MQTT接收消息

package cn.varin;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test;
public class MqttReceiveMessageTest {
    /**
     *
     *
     * String serverURI,(mqtt服务端地址)
     * String clientId,(客户端id)
     * MqttClientPersistence persistence(内存持久类)
     *
     *
     */
    public static String serviceURL= "tcp://varin.cn:1883";
    public static String clientId="varya_test_01";
    public static MqttClient client;
    public static String user="varya";
   public static String password= "123456";
    static {
        try {
            // 建立一个mqqt客户端类
            client = new MqttClient(serviceURL,clientId,new MemoryPersistence());
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName(user);
            mqttConnectOptions.setPassword(password.toCharArray());
            client.connect(mqttConnectOptions);
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
    // 接受消息
    @Test
    public void SendMessageTest() throws MqttException {
       client.subscribe("java_and_mqttx_conn",2);
       // 建立接收消息回调
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                // 连接丢失时调用
                System.out.println("cooection error");
            }
            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                // 接收到消息时调用
                System.out.println("来自主题:"+s);
                System.out.println("接收到的消息为:"+new String(mqttMessage.getPayload()));
            }
            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                // 解释消息结束时调用
                System.out.println("deliveryComplete");
            }
        });
        // 为了保持test类的连接,建立一个死循环语句
        while (true);
    }
}



四、🐉 彩蛋:会喷火的测试恐龙

参考文章:https://blog.csdn.net/huangzhe0701/article/details/145205822

在终端运行:

mosquitto_pub -h localhost -t "java_and_mqttx_conn" -m "恐龙喷火啦~🔥"

观察Java程序是否输出火焰日志!


五、💡 常见魔法失效对策

  1. 检查1883端口是否被麻瓜防火墙阻挡
  2. 确认EMQX服务像打人柳一样活跃(sudo systemctl status emqx
  3. Java依赖是否像魔药材料一样齐全

结语:现在你的Java程序已经获得了通信魔法!快来用MQTT实现更多神奇功能吧~ 如果(程序)不显形,欢迎在评论区召唤帮忙!🎩

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
java原生发送http请求
java原生发送http请求
351 0
|
数据采集 小程序 数据可视化
智慧校园电子班牌管理系统源码 Java Android原生
家长通过家长小程序端随时了解孩子在校的情况,实时接收学生的出勤情况,学生到校、离校时间。随时了解学生在校的表现、学生成绩排名,及时与教师沟通,关注孩子的健康成长。
237 0
智慧校园电子班牌管理系统源码 Java Android原生
|
JSON 自然语言处理 Java
Java原生操作Elasticsearch
Java原生操作Elasticsearch
323 0
|
编解码 小程序 Java
【Java】基于云计算-智慧校园电子班牌系统源码带原生微信小程序端
【Java】基于云计算-智慧校园电子班牌系统源码带原生微信小程序端
264 0
|
Java
让星星⭐月亮告诉你,自定义定时器和Java自带原生定时器
定时器是一种可以设置多个具有不同执行时间和间隔的任务的工具。本文介绍了定时器的基本概念、如何自定义实现一个定时器,以及Java原生定时器的使用方法,包括定义定时任务接口、实现任务、定义任务处理线程和使用Java的`Timer`与`TimerTask`类来管理和执行定时任务。
480 3
|
网络协议 JavaScript 前端开发
Java一分钟之-GraalVM Native Image:构建原生可执行文件
【6月更文挑战第13天】GraalVM Native Image是Java开发的创新技术,它将应用编译成独立的原生可执行文件,实现快速启动和低内存消耗,对微服务、桌面应用和嵌入式系统有重大影响。本文讨论了如何使用Native Image,包括常见挑战如反射与动态类加载、静态初始化问题和依赖冲突,并提供了解决方案和代码示例。通过合理规划和利用GraalVM工具,开发者可以克服这些问题,充分利用Native Image提升应用性能。
785 5
|
传感器 小程序 搜索推荐
(源码)java开发的一套(智慧校园系统源码、电子班牌、原生小程序开发)多端展示:web端、saas端、家长端、教师端
通过电子班牌设备和智慧校园数据平台的统一管理,在电子班牌上,班牌展示、学生上课刷卡考勤、考勤状况汇总展示,课表展示,考场管理,请假管理,成绩查询,考试优秀标兵展示、校园通知展示,班级文化各片展示等多种化展示。
264 0
(源码)java开发的一套(智慧校园系统源码、电子班牌、原生小程序开发)多端展示:web端、saas端、家长端、教师端
|
Kubernetes Cloud Native Java
探索未来编程新纪元:Quarkus带你秒建高性能Kubernetes原生Java应用,云原生时代的技术狂欢!
Quarkus 是专为 Kubernetes 设计的全栈云原生 Java 框架,凭借其轻量级、快速启动及高效执行特性,在 Java 社区脱颖而出。通过编译时优化与原生镜像支持,Quarkus 提升了应用性能,同时保持了 Java 的熟悉度与灵活性。本文将指导你从创建项目、编写 REST 控制器到构建与部署 Kubernetes 原生镜像的全过程,让你快速上手 Quarkus,体验高效开发与部署的乐趣。
508 1
|
XML Java 数据格式
【JAVA日志框架】JUL,JDK原生日志框架详解。
【JAVA日志框架】JUL,JDK原生日志框架详解。
138 0
|
Kubernetes Cloud Native Java
Java一分钟之-Quarkus:Kubernetes原生的Java框架
【6月更文挑战第12天】Quarkus是面向Kubernetes的Java框架,以其超快启动速度和低内存占用著称。核心特性包括AOT编译实现毫秒级启动、优化的运行时模型、与Kubernetes的无缝集成及丰富的扩展库。常见问题涉及Maven依赖管理、热重载机制理解和配置文件的忽视。解决这些问题的关键在于深入学习官方文档、使用Dev UI调试和参与社区交流。通过代码示例展示了如何快速创建REST服务。掌握Quarkus能提升开发效率,适应微服务架构。
334 0

热门文章

最新文章