Eclipse Paho MQTT客户端Java源码分析

简介: Eclipse Paho MQTT客户端Java源码分析

一、如何创建MQTT客户端


就像搭积木一样创建客户端


1.1 定义连接配置


负责类:MqttConnectOptions

职责:设置连接的用户名、密码、心跳、超时、重连等参数

源代码:

public static MqttConnectOptions getMqttConnectOptions(String userName,String password) {
      MqttConnectOptions options = new MqttConnectOptions();
      // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
      // 这里设置为true表示每次连接到服务器都以新的身份连接
      options.setCleanSession(true);
      // 设置连接的用户名
      options.setUserName(userName);
      // 设置连接的密码
      options.setPassword(password.toCharArray());
      // 设置超时时间 单位为秒
      options.setConnectionTimeout(10);
      // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
      options.setKeepAliveInterval(20);
      /*
       * 在尝试重新连接之前,它最初会等待 1 秒,对于每次失败的重新连接尝试,延迟将加倍,直到 2 分钟,
       * 此时延迟将保持在 2 分钟。
       */
      options.setAutomaticReconnect(true);
      // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
      options.setWill("willTopic", WILL_DATA, 2, false);
      return options;
  }

1.2 设置回调


负责类:MqttCallbackExtended

1666187164461.jpg

职责:通过回调监听数据通道的不同事件,包含连接成功、连接丢失、发送成功、收到数据

源代码:

  @Slf4j
  private static class BtcMqttCallback implements MqttCallbackExtended{
//        public MqttClient mqttClient;
//
//        public BtcMqttCallback(MqttClient mqttClient) {
//            this.mqttClient = mqttClient;
//        }
      @Override
      public void connectionLost(Throwable cause) {
         if (cause != null){
             log.error("连接丢失",cause);
         }
      }
      public void deliveryComplete(IMqttDeliveryToken token) {
          try {
              token.waitForCompletion();
              log.info("发送MQTT数据成功,消息是{}: {} : {}",token.getTopics(),token.getMessage().getQos(),token.getMessage());
          } catch (MqttException e) {
              e.printStackTrace();
          }
      }
      // 订阅后得到的消息会执行到这里面
      public void messageArrived(String topic, MqttMessage message) {
          String msg = new String(message.getPayload(), Charset.forName("UTF-8"));
          System.out.println("messageArrived() topic:" + topic);
          System.out.println(msg);
      }
      //连接成功就会调用,首次连接reconnect为false,重连为true
      @Override
      public void connectComplete(boolean reconnect, String serverURI) {
          log.info("连接成功: {}, 服务器地址是: {}",reconnect, serverURI);
      }
  }

1.3 开启连接、订阅及推送


负责类:MqttClient

职责:负责通道的建立、数据的订阅以及数据的推送

源代码:

 public static MqttClient createNewMqttClient(String url,String clientId,String password){
      MqttConnectOptions options =  getMqttConnectOptions("root",password);
      MqttClient mqttClient = null;
      try {
          mqttClient = new MqttClient(url,clientId);
          //在连接之前设置回调
          mqttClient.setCallback(new BtcMqttCallback());
          mqttClient.connect(options);
//            mqttClient.subscribe("topic1");
      } catch (MqttException e) {
          e.printStackTrace();
      }
      return mqttClient;
  }

1.4 MQTT消息类型


负责类:MqttWireMessage

职责:MQTT消息实体类,种类共计0x0F种

源代码:

public static final byte MESSAGE_TYPE_CONNECT = 1;
public static final byte MESSAGE_TYPE_CONNACK = 2;
public static final byte MESSAGE_TYPE_PUBLISH = 3;
public static final byte MESSAGE_TYPE_PUBACK = 4;
public static final byte MESSAGE_TYPE_PUBREC = 5;
public static final byte MESSAGE_TYPE_PUBREL = 6;
public static final byte MESSAGE_TYPE_PUBCOMP = 7;
public static final byte MESSAGE_TYPE_SUBSCRIBE = 8;
public static final byte MESSAGE_TYPE_SUBACK = 9;
public static final byte MESSAGE_TYPE_UNSUBSCRIBE = 10;
public static final byte MESSAGE_TYPE_UNSUBACK = 11;
public static final byte MESSAGE_TYPE_PINGREQ = 12;
public static final byte MESSAGE_TYPE_PINGRESP = 13;
public static final byte MESSAGE_TYPE_DISCONNECT = 14;
private static final String PACKET_NAMES[] = { "reserved", "CONNECT", "CONNACK", "PUBLISH",
  "PUBACK", "PUBREC", "PUBREL", "PUBCOMP", "SUBSCRIBE", "SUBACK",
  "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT" };

对应的MQTT消息类型为:

1666187233433.jpg


二、到底谁在干活


2.1 从connect说起


源码来看,最后负责通信的职责类是ClientComms,该类就是抽象的数据通道,从MqttClient.connect()出发,一步一步进入到ClientComms

1666187287811.jpg

1666187298258.jpg


该类ConnectBGClientComms私有类,实现了Runnable接口,主要工作都在熟知的run方法里。


2.2 接着run


进入到ConnectBG的run方法里可以看到,网络模块以及MQTT数据的接收、发送和事件回调分别起了一个任务,都提交到默认为10个线程的线程池ExecutorService类中执行。由ConnectBG类名可知,将这些事件任务都放到后台执行,防止阻塞主线程,如socket创建就很费时。


receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId(), executorService);
callback.start("MQTT Call: "+getClient().getClientId(), executorService);

2.3 执行


通过回调方式对通信事件处理,底层执行类是CommsCallback,直接看其run方法中的逻辑。

1666187415563.jpg

主要是看其中的handleActionComplete(MqttToken token)方法,进而进入ireActionEvent(token)方法里。


if ( mqttCallback != null 
    && token instanceof MqttDeliveryToken 
    && token.isComplete()) {
      mqttCallback.deliveryComplete((MqttDeliveryToken) token);
  }
  // Now call async action completion callbacks
  fireActionEvent(token);


我们会看到当数据发送事件成功时,会触发该事件的回调执行并携带执行结果的状态IMqttToken。


三、安全机制


3.1 重连机制


MqttClient的重连采用退避的方式每次重连的时间都会加倍,最初会等待 1 秒,对于每次失败的重新连接尝试,延迟将加倍,直到最大值 2 分钟。


3.2 心跳机制


3.3 超时机制


四、封装成工具类


思路是:

每次收到客户端上传到数据中心的消息,就创建一个MqttClient对象,并将其添加到并发列表Vector中,通过对客户端连接状态的判断,进行数据的处理。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
19天前
|
存储 Java API
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
25 4
|
1月前
|
Java
Java基础之 JDK8 HashMap 源码分析(中间写出与JDK7的区别)
这篇文章详细分析了Java中HashMap的源码,包括JDK8与JDK7的区别、构造函数、put和get方法的实现,以及位运算法的应用,并讨论了JDK8中的优化,如链表转红黑树的阈值和扩容机制。
23 1
|
1月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
62 1
|
1月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
24 0
rabbitmq基础教程(ui,java,springamqp)
|
2月前
|
JSON NoSQL Java
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
这篇文章介绍了在Java中使用Redis客户端的几种方法,包括Jedis、SpringDataRedis和SpringBoot整合Redis的操作。文章详细解释了Jedis的基本使用步骤,Jedis连接池的创建和使用,以及在SpringBoot项目中如何配置和使用RedisTemplate和StringRedisTemplate。此外,还探讨了RedisTemplate序列化的两种实践方案,包括默认的JDK序列化和自定义的JSON序列化,以及StringRedisTemplate的使用,它要求键和值都必须是String类型。
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
|
1月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
89 0
|
2月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
170 0
|
3月前
|
Java
Java使用FileInputStream&&FileOutputStream模拟客户端向服务器端上传文件(单线程)
Java使用FileInputStream&&FileOutputStream模拟客户端向服务器端上传文件(单线程)
83 1
|
3月前
|
网络协议 Java 应用服务中间件
Tomcat源码分析 (一)----- 手撕Java Web服务器需要准备哪些工作
本文探讨了后端开发中Web服务器的重要性,特别是Tomcat框架的地位与作用。通过解析Tomcat的内部机制,文章引导读者理解其复杂性,并提出了一种实践方式——手工构建简易Web服务器,以此加深对Web服务器运作原理的认识。文章还详细介绍了HTTP协议的工作流程,包括请求与响应的具体格式,并通过Socket编程在Java中的应用实例,展示了客户端与服务器间的数据交换过程。最后,通过一个简单的Java Web服务器实现案例,说明了如何处理HTTP请求及响应,强调虽然构建基本的Web服务器相对直接,但诸如Tomcat这样的成熟框架提供了更为丰富和必要的功能。

推荐镜像

更多