MQTT(EMQX) - Java 调用 MQTT Demo 代码

简介: MQTT(EMQX) - Java 调用 MQTT Demo 代码

POM

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

Service.java

package com.vipsoft.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.Scanner;
public class Service {
    public static void main(String[] args) throws Exception {
        String host = "tcp://172.16.3.88:1883";
        String topic = "VipSoft_MQTT";
        String clientId = "server_id"; // clientId不能重复这个是server的id
        //新建mqtt连接
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        //新建mqtt客户端
        MqttClient client = new MqttClient(host, clientId);
        client.connect(options);
        //新建mqtt消息
        MqttMessage message = new MqttMessage();
        @SuppressWarnings("resource")
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入要发送的内容:");
        while (true) {
            String MsgMessage= scanner.nextLine();
            message.setPayload(MsgMessage.getBytes());
            client.publish(topic, message);
        }
    }
}

Client.java

package com.vipsoft.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
public class Client {
    public static void main(String[] args) throws Exception {
        String host = "tcp://172.16.3.88:1883";
        String topic = "VipSoft_MQTT";
        String clientId = "client_id";
        // 1.设置mqtt连接属性
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        // 2.实例化mqtt客户端
        MqttClient client = new MqttClient(host, clientId);
        // 3.连接
        client.connect(options);
        //这里的setCallback需要新建一个Callback类并实现 MqttCallback 这个类
        client.setCallback(new PushCallback());
        while (true) {
            client.subscribe(topic, 2);
        } 
    }
}

PushCallback.java

package com.vipsoft.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
 * 发布消息的回调类
 *
 * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
 * 在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 必须在回调类中实现三个方法:
 *
 *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
 *
 *  public void connectionLost(Throwable cause)在断开连接时调用。
 *
 *  public void deliveryComplete(MqttDeliveryToken token))
 *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
 *  由 MqttClient.connect 激活此回调。
 *
 */
public class PushCallback implements MqttCallback {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后进行重连
        System.out.println("连接断开,可以做重连");
        logger.info("掉线时间:{}", new Date());
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        // System.out.println(message);
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }
}

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
Java 开发工具
【Azure Storage Account】Java Code访问Storage Account File Share的上传和下载代码示例
本文介绍如何使用Java通过azure-storage-file-share SDK实现Azure文件共享的上传下载。包含依赖引入、客户端创建及完整示例代码,助你快速集成Azure File Share功能。
418 5
|
5月前
|
IDE Java 关系型数据库
Java 初学者学习路线(含代码示例)
本教程为Java初学者设计,涵盖基础语法、面向对象、集合、异常处理、文件操作、多线程、JDBC、Servlet及MyBatis等内容,每阶段配核心代码示例,强调动手实践,助你循序渐进掌握Java编程。
694 3
|
5月前
|
安全 Java 应用服务中间件
Spring Boot + Java 21:内存减少 60%,启动速度提高 30% — 零代码
通过调整三个JVM和Spring Boot配置开关,无需重写代码即可显著优化Java应用性能:内存减少60%,启动速度提升30%。适用于所有在JVM上运行API的生产团队,低成本实现高效能。
629 3
|
5月前
|
Java API 开发工具
【Azure Developer】Java代码实现获取Azure 资源的指标数据却报错 "invalid time interval input"
在使用 Java 调用虚拟机 API 获取指标数据时,因本地时区设置非 UTC,导致时间格式解析错误。解决方法是在代码中手动指定时区为 UTC,使用 `ZoneOffset.ofHours(0)` 并结合 `withOffsetSameInstant` 方法进行时区转换,从而避免因时区差异引发的时间格式问题。
294 3
|
4月前
|
Java 数据处理 API
为什么你的Java代码应该多用Stream?从循环到声明式的思维转变
为什么你的Java代码应该多用Stream?从循环到声明式的思维转变
303 115
|
6月前
|
人工智能 监控 安全
智慧工地解决方案,java智慧工地程序代码
智慧工地系统融合物联网、AI、大数据等技术,实现对施工现场“人、机、料、法、环”的全面智能监控与管理,提升安全、效率与决策水平。
195 2
|
4月前
|
安全 Java 编译器
为什么你的Java代码需要泛型?类型安全的艺术
为什么你的Java代码需要泛型?类型安全的艺术
219 98
|
5月前
|
Java
java入门代码示例
本文介绍Java入门基础,包含Hello World、变量类型、条件判断、循环及方法定义等核心语法示例,帮助初学者快速掌握Java编程基本结构与逻辑。
489 0
|
4月前
|
安全 Java 容器
告别繁琐判空:Optional让你的Java代码更优雅
告别繁琐判空:Optional让你的Java代码更优雅