开发者社区> 问答> 正文

MQTT 客户端收发 MQTT 消息(2)



3. MQTT 发送消息


本段示例代码演示如何使用 MQTT 客户端发送普通消息和 P2P 的点对点消息,其中用到的工具 MacSignature 参考下文。

  1. public class MQTTSendMsg {
  2.     public static void main(String[] args) throws IOException {
  3.        /**
  4.          * 设置当前用户私有的MQTT的接入点。例如此处示意使用XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请MQTT实例,每个实例都会分配一个接入点域名。
  5.          */
  6.         final String broker ="tcp://XXXX.mqtt.aliyuncs.com:1883";
  7.         /**
  8.          * 设置阿里云的AccessKey,用于鉴权
  9.          */
  10.         final String acessKey ="XXXXXX";
  11.         /**
  12.          * 设置阿里云的SecretKey,用于鉴权
  13.          */
  14.         final String secretKey ="XXXXXXX";
  15.         /**
  16.          * 发消息使用的一级Topic,需要先在MQ控制台里申请
  17.          */
  18.         final String topic ="XXXX";
  19.         /**
  20.          * MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
  21.          * 其中GroupID在MQ控制台里申请
  22.          * DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
  23.          */
  24.         final String clientId ="GID_XXX@@@ClientID_XXXX";
  25.         String sign;
  26.         MemoryPersistence persistence = new MemoryPersistence();
  27.         try {
  28.             final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  29.             final MqttConnectOptions connOpts = new MqttConnectOptions();
  30.             System.out.println("Connecting to broker: " + broker);
  31.             /**
  32.              * 计算签名,将签名作为MQTT的password。
  33.              * 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
  34.              * 第二个参数阿里云的SecretKey
  35.              */
  36.             sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
  37.             connOpts.setUserName(acessKey);
  38.             connOpts.setServerURIs(new String[] { broker });
  39.             connOpts.setPassword(sign.toCharArray());
  40.             connOpts.setCleanSession(true);
  41.             connOpts.setKeepAliveInterval(90);
  42.             connOpts.setAutomaticReconnect(true);
  43.             sampleClient.setCallback(new MqttCallbackExtended() {
  44.                 public void connectComplete(boolean reconnect, String serverURI) {
  45.                     System.out.println("connect success");
  46.                     //连接成功,需要上传客户端所有的订阅关系
  47.                 }
  48.                 public void connectionLost(Throwable throwable) {
  49.                     System.out.println("mqtt connection lost");
  50.                 }
  51.                 public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  52.                     System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
  53.                 }
  54.                 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  55.                     System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  56.                 }
  57.             });
  58.             sampleClient.connect(connOpts);
  59.             for (int i = 0; i < 10; i++) {
  60.                 try {
  61.                     String scontent = new Date()+"MQTT Test body" + i;
  62.                     //此处消息体只需要传入byte数组即可,对于其他类型的消息,请自行完成二进制数据的转换
  63.                     final MqttMessage message = new MqttMessage(scontent.getBytes());
  64.                     message.setQos(0);
  65.                     System.out.println(i+" pushed at "+new Date()+" "+ scontent);
  66.                     /**
  67.                      *消息发送到某个主题Topic,所有订阅这个Topic的设备都能收到这个消息。
  68.                      * 遵循MQTT的发布订阅规范,Topic也可以是多级Topic。此处设置了发送到二级Topic
  69.                      */
  70.                     sampleClient.publish(topic+"/notice/", message);
  71.                     /**
  72.                      * 如果发送P2P消息,二级Topic必须是“p2p”,三级Topic是目标的ClientID
  73.                      * 此处设置的三级Topic需要是接收方的ClientID
  74.                      */
  75.                     String p2pTopic =topic+"/p2p/GID_mqttdelay3@@@DEVICEID_001";
  76.                     sampleClient.publish(p2pTopic,message);
  77.                 } catch (Exception e) {
  78.                     e.printStackTrace();
  79.                 }
  80.             }
  81.         } catch (Exception me) {
  82.             me.printStackTrace();
  83.         }
  84.     }
  85. }


4. MQTT 接收消息


本段代码演示如何使用 MQTT 客户端订阅消息,接收普通的消息以及点对点消息。
  1. public class MQTTRecvMsg {
  2.    public static void main(String[] args) throws IOException {
  3.        /**
  4.          * 设置当前用户私有的MQTT的接入点。例如此处示意使用XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请MQTT实例,每个实例都会分配一个接入点域名。
  5.          */
  6.         final String broker ="tcp://XXXX.mqtt.aliyuncs.com:1883";
  7.         /**
  8.          * 设置阿里云的AccessKey,用于鉴权
  9.          */
  10.         final String acessKey ="XXXXXX";
  11.         /**
  12.          * 设置阿里云的SecretKey,用于鉴权
  13.          */
  14.         final String secretKey ="XXXXXXX";
  15.         /**
  16.          * 发消息使用的一级Topic,需要先在MQ控制台里申请
  17.          */
  18.         final String topic ="XXXX";
  19.         /**
  20.          * MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
  21.          * 其中GroupID在MQ控制台里申请
  22.          * DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
  23.          */
  24.         final String clientId ="GID_XXXX@@@ClientID_XXXXXX";
  25.         String sign;
  26.         MemoryPersistence persistence = new MemoryPersistence();
  27.         try {
  28.             final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  29.             final MqttConnectOptions connOpts = new MqttConnectOptions();
  30.             System.out.println("Connecting to broker: " + broker);
  31.             /**
  32.              * 计算签名,将签名作为MQTT的password
  33.              * 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
  34.              * 第二个参数阿里云的SecretKey
  35.              */
  36.             sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
  37.             /**
  38.              * 设置订阅方订阅的Topic集合,此处遵循MQTT的订阅规则,可以是一级Topic,二级Topic,P2P消息请订阅/p2p
  39.              */
  40.             final String[] topicFilters=new String[]{topic+"/notice/",topic+"/p2p"};
  41.             final int[]qos={0,0};
  42.             connOpts.setUserName(acessKey);
  43.             connOpts.setServerURIs(new String[] { broker });
  44.             connOpts.setPassword(sign.toCharArray());
  45.             connOpts.setCleanSession(true);
  46.             connOpts.setKeepAliveInterval(90);
  47.             connOpts.setAutomaticReconnect(true);
  48.             sampleClient.setCallback(new MqttCallbackExtended() {
  49.                 public void connectComplete(boolean reconnect, String serverURI) {
  50.                     System.out.println("connect success");
  51.                     //连接成功,需要上传客户端所有的订阅关系
  52.                     sampleClient.subscribe(topicFilters,qos);
  53.                 }
  54.                 public void connectionLost(Throwable throwable) {
  55.                     System.out.println("mqtt connection lost");
  56.                 }
  57.                 public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  58.                     System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
  59.                 }
  60.                 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  61.                     System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  62.                 }
  63.             });
  64.             //客户端每次上线都必须上传自己所有涉及的订阅关系,否则可能会导致消息接收延迟
  65.             sampleClient.connect(connOpts);
  66.             //每个客户端最多允许存在30个订阅关系,超出限制可能会丢弃导致收不到部分消息
  67.             sampleClient.subscribe(topicFilters,qos);
  68.             Thread.sleep(Integer.MAX_VALUE);
  69.         } catch (Exception me) {
  70.             me.printStackTrace();
  71.         }
  72.     }
  73. }

上文代码用到的工具类 MacSignature.java 如下:
  1. public class MacSignature {
  2.     /**
  3.      * @param text      要签名的文本
  4.      * @param secretKey 阿里云MQ SecretKey
  5.      * @return 加密后的字符串
  6.      * @throws InvalidKeyException
  7.      * @throws NoSuchAlgorithmException
  8.      */
  9.     public static String macSignature(String text, String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
  10.         Charset charset = Charset.forName("UTF-8");
  11.         String algorithm = "HmacSHA1";
  12.         Mac mac = Mac.getInstance(algorithm);
  13.         mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
  14.         byte[] bytes = mac.doFinal(text.getBytes(charset));
  15.         return new String(Base64.encodeBase64(bytes), charset);
  16.     }
  17.     /**
  18.      * 发送方签名方法
  19.      *
  20.      * @param clientId  MQTT ClientID
  21.      * @param secretKey 阿里云MQ SecretKey
  22.      * @return 加密后的字符串
  23.      * @throws NoSuchAlgorithmException
  24.      * @throws InvalidKeyException
  25.      */
  26.     public static String publishSignature(String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  27.         return macSignature(clientId, secretKey);
  28.     }
  29.     /**
  30.      * 订阅方签名方法
  31.      *
  32.      * @param topics    要订阅的Topic集合
  33.      * @param clientId  MQTT ClientID
  34.      * @param secretKey 阿里云MQ SecretKey
  35.      * @return 加密后的字符串
  36.      * @throws NoSuchAlgorithmException
  37.      * @throws InvalidKeyException
  38.      */
  39.     public static String subSignature(List<String> topics, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  40.         Collections.sort(topics); //以字典顺序排序
  41.         String topicText = "";
  42.         for (String topic : topics) {
  43.             topicText += topic + "\n";
  44.         }
  45.         String text = topicText + clientId;
  46.         return macSignature(text, secretKey);
  47.     }
  48.     /**
  49.      * 订阅方签名方法
  50.      *
  51.      * @param topic     要订阅的Topic
  52.      * @param clientId  MQTT ClientID
  53.      * @param secretKey 阿里云MQ SecretKey
  54.      * @return 加密后的字符串
  55.      * @throws NoSuchAlgorithmException
  56.      * @throws InvalidKeyException
  57.      */
  58.     public static String subSignature(String topic, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
  59.         List<String> topics = new ArrayList<String>();
  60.         topics.add(topic);
  61.         return subSignature(topics, clientId, secretKey);
  62.     }
  63. }

展开
收起
猫饭先生 2017-10-27 10:57:53 6027 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
RocketMQ Client-GO 介绍 立即下载
RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载