一、如何创建MQTT客户端
就像搭积木一样创建客户端
1.1 定义连接配置
负责类:MqttConnectOptions
职责:设置连接的用户名、密码、心跳、超时、重连等参数
源代码:
public static MqttConnectOptions getMqttConnectOptions(String userName,String password) { MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置连接的用户名 options.setUserName(userName); // 设置连接的密码 options.setPassword(password.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); /* * 在尝试重新连接之前,它最初会等待 1 秒,对于每次失败的重新连接尝试,延迟将加倍,直到 2 分钟, * 此时延迟将保持在 2 分钟。 */ options.setAutomaticReconnect(true); // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 options.setWill("willTopic", WILL_DATA, 2, false); return options; }
1.2 设置回调
负责类:MqttCallbackExtended
职责:通过回调监听数据通道的不同事件,包含连接成功、连接丢失、发送成功、收到数据
源代码:
@Slf4j private static class BtcMqttCallback implements MqttCallbackExtended{ // public MqttClient mqttClient; // // public BtcMqttCallback(MqttClient mqttClient) { // this.mqttClient = mqttClient; // } @Override public void connectionLost(Throwable cause) { if (cause != null){ log.error("连接丢失",cause); } } public void deliveryComplete(IMqttDeliveryToken token) { try { token.waitForCompletion(); log.info("发送MQTT数据成功,消息是{}: {} : {}",token.getTopics(),token.getMessage().getQos(),token.getMessage()); } catch (MqttException e) { e.printStackTrace(); } } // 订阅后得到的消息会执行到这里面 public void messageArrived(String topic, MqttMessage message) { String msg = new String(message.getPayload(), Charset.forName("UTF-8")); System.out.println("messageArrived() topic:" + topic); System.out.println(msg); } //连接成功就会调用,首次连接reconnect为false,重连为true @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("连接成功: {}, 服务器地址是: {}",reconnect, serverURI); } }
1.3 开启连接、订阅及推送
负责类:MqttClient
职责:负责通道的建立、数据的订阅以及数据的推送
源代码:
public static MqttClient createNewMqttClient(String url,String clientId,String password){ MqttConnectOptions options = getMqttConnectOptions("root",password); MqttClient mqttClient = null; try { mqttClient = new MqttClient(url,clientId); //在连接之前设置回调 mqttClient.setCallback(new BtcMqttCallback()); mqttClient.connect(options); // mqttClient.subscribe("topic1"); } catch (MqttException e) { e.printStackTrace(); } return mqttClient; }
1.4 MQTT消息类型
负责类:MqttWireMessage
职责:MQTT消息实体类,种类共计0x0F种
源代码:
public static final byte MESSAGE_TYPE_CONNECT = 1; public static final byte MESSAGE_TYPE_CONNACK = 2; public static final byte MESSAGE_TYPE_PUBLISH = 3; public static final byte MESSAGE_TYPE_PUBACK = 4; public static final byte MESSAGE_TYPE_PUBREC = 5; public static final byte MESSAGE_TYPE_PUBREL = 6; public static final byte MESSAGE_TYPE_PUBCOMP = 7; public static final byte MESSAGE_TYPE_SUBSCRIBE = 8; public static final byte MESSAGE_TYPE_SUBACK = 9; public static final byte MESSAGE_TYPE_UNSUBSCRIBE = 10; public static final byte MESSAGE_TYPE_UNSUBACK = 11; public static final byte MESSAGE_TYPE_PINGREQ = 12; public static final byte MESSAGE_TYPE_PINGRESP = 13; public static final byte MESSAGE_TYPE_DISCONNECT = 14; private static final String PACKET_NAMES[] = { "reserved", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT" };
对应的MQTT消息类型为:
二、到底谁在干活
2.1 从connect说起
从源码来看,最后负责通信的职责类是ClientComms
,该类就是抽象的数据通道,从MqttClient.connect()
出发,一步一步进入到ClientComms
。
该类ConnectBG
是ClientComms
私有类,实现了Runnable接口,主要工作都在熟知的run方法里。
2.2 接着run
进入到ConnectBG
的run方法里可以看到,网络模块以及MQTT数据的接收、发送和事件回调分别起了一个任务,都提交到默认为10个线程的线程池ExecutorService
类中执行。由ConnectBG
类名可知,将这些事件任务都放到后台执行,防止阻塞主线程,如socket创建就很费时。
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream()); receiver.start("MQTT Rec: "+getClient().getClientId(), executorService); sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream()); sender.start("MQTT Snd: "+getClient().getClientId(), executorService); callback.start("MQTT Call: "+getClient().getClientId(), executorService);
2.3 执行
通过回调方式对通信事件处理,底层执行类是CommsCallback
,直接看其run
方法中的逻辑。
主要是看其中的handleActionComplete(MqttToken token)
方法,进而进入ireActionEvent(token)
方法里。
if ( mqttCallback != null && token instanceof MqttDeliveryToken && token.isComplete()) { mqttCallback.deliveryComplete((MqttDeliveryToken) token); } // Now call async action completion callbacks fireActionEvent(token);
我们会看到当数据发送事件成功时,会触发该事件的回调执行并携带执行结果的状态IMqttToken。
三、安全机制
3.1 重连机制
MqttClient
的重连采用退避的方式每次重连的时间都会加倍,最初会等待 1 秒,对于每次失败的重新连接尝试,延迟将加倍,直到最大值 2 分钟。
3.2 心跳机制
3.3 超时机制
四、封装成工具类
思路是:
每次收到客户端上传到数据中心的消息,就创建一个MqttClient
对象,并将其添加到并发列表Vector
中,通过对客户端连接状态的判断,进行数据的处理。