3. MQTT 发送消息
本段示例代码演示如何使用 MQTT 客户端发送普通消息和 P2P 的点对点消息,其中用到的工具 MacSignature 参考下文。
- public class MQTTSendMsg {
- public static void main(String[] args) throws IOException {
- /**
- * 设置当前用户私有的MQTT的接入点。例如此处示意使用XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请MQTT实例,每个实例都会分配一个接入点域名。
- */
- final String broker ="tcp://XXXX.mqtt.aliyuncs.com:1883";
- /**
- * 设置阿里云的AccessKey,用于鉴权
- */
- final String acessKey ="XXXXXX";
- /**
- * 设置阿里云的SecretKey,用于鉴权
- */
- final String secretKey ="XXXXXXX";
- /**
- * 发消息使用的一级Topic,需要先在MQ控制台里申请
- */
- final String topic ="XXXX";
- /**
- * MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
- * 其中GroupID在MQ控制台里申请
- * DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
- */
- final String clientId ="GID_XXX@@@ClientID_XXXX";
- String sign;
- MemoryPersistence persistence = new MemoryPersistence();
- try {
- final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
- final MqttConnectOptions connOpts = new MqttConnectOptions();
- System.out.println("Connecting to broker: " + broker);
- /**
- * 计算签名,将签名作为MQTT的password。
- * 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
- * 第二个参数阿里云的SecretKey
- */
- sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
- connOpts.setUserName(acessKey);
- connOpts.setServerURIs(new String[] { broker });
- connOpts.setPassword(sign.toCharArray());
- connOpts.setCleanSession(true);
- connOpts.setKeepAliveInterval(90);
- connOpts.setAutomaticReconnect(true);
- sampleClient.setCallback(new MqttCallbackExtended() {
- public void connectComplete(boolean reconnect, String serverURI) {
- System.out.println("connect success");
- //连接成功,需要上传客户端所有的订阅关系
- }
- public void connectionLost(Throwable throwable) {
- System.out.println("mqtt connection lost");
- }
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
- }
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
- }
- });
- sampleClient.connect(connOpts);
- for (int i = 0; i < 10; i++) {
- try {
- String scontent = new Date()+"MQTT Test body" + i;
- //此处消息体只需要传入byte数组即可,对于其他类型的消息,请自行完成二进制数据的转换
- final MqttMessage message = new MqttMessage(scontent.getBytes());
- message.setQos(0);
- System.out.println(i+" pushed at "+new Date()+" "+ scontent);
- /**
- *消息发送到某个主题Topic,所有订阅这个Topic的设备都能收到这个消息。
- * 遵循MQTT的发布订阅规范,Topic也可以是多级Topic。此处设置了发送到二级Topic
- */
- sampleClient.publish(topic+"/notice/", message);
- /**
- * 如果发送P2P消息,二级Topic必须是“p2p”,三级Topic是目标的ClientID
- * 此处设置的三级Topic需要是接收方的ClientID
- */
- String p2pTopic =topic+"/p2p/GID_mqttdelay3@@@DEVICEID_001";
- sampleClient.publish(p2pTopic,message);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- } catch (Exception me) {
- me.printStackTrace();
- }
- }
- }
4. MQTT 接收消息
本段代码演示如何使用 MQTT 客户端订阅消息,接收普通的消息以及点对点消息。
- public class MQTTRecvMsg {
- public static void main(String[] args) throws IOException {
- /**
- * 设置当前用户私有的MQTT的接入点。例如此处示意使用XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台申请MQTT实例,每个实例都会分配一个接入点域名。
- */
- final String broker ="tcp://XXXX.mqtt.aliyuncs.com:1883";
- /**
- * 设置阿里云的AccessKey,用于鉴权
- */
- final String acessKey ="XXXXXX";
- /**
- * 设置阿里云的SecretKey,用于鉴权
- */
- final String secretKey ="XXXXXXX";
- /**
- * 发消息使用的一级Topic,需要先在MQ控制台里申请
- */
- final String topic ="XXXX";
- /**
- * MQTT的ClientID,一般由两部分组成,GroupID@@@DeviceID
- * 其中GroupID在MQ控制台里申请
- * DeviceID由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的ClientID连接
- */
- final String clientId ="GID_XXXX@@@ClientID_XXXXXX";
- String sign;
- MemoryPersistence persistence = new MemoryPersistence();
- try {
- final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
- final MqttConnectOptions connOpts = new MqttConnectOptions();
- System.out.println("Connecting to broker: " + broker);
- /**
- * 计算签名,将签名作为MQTT的password
- * 签名的计算方法,参考工具类MacSignature,第一个参数是ClientID的前半部分,即GroupID
- * 第二个参数阿里云的SecretKey
- */
- sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
- /**
- * 设置订阅方订阅的Topic集合,此处遵循MQTT的订阅规则,可以是一级Topic,二级Topic,P2P消息请订阅/p2p
- */
- final String[] topicFilters=new String[]{topic+"/notice/",topic+"/p2p"};
- final int[]qos={0,0};
- connOpts.setUserName(acessKey);
- connOpts.setServerURIs(new String[] { broker });
- connOpts.setPassword(sign.toCharArray());
- connOpts.setCleanSession(true);
- connOpts.setKeepAliveInterval(90);
- connOpts.setAutomaticReconnect(true);
- sampleClient.setCallback(new MqttCallbackExtended() {
- public void connectComplete(boolean reconnect, String serverURI) {
- System.out.println("connect success");
- //连接成功,需要上传客户端所有的订阅关系
- sampleClient.subscribe(topicFilters,qos);
- }
- public void connectionLost(Throwable throwable) {
- System.out.println("mqtt connection lost");
- }
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
- }
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
- }
- });
- //客户端每次上线都必须上传自己所有涉及的订阅关系,否则可能会导致消息接收延迟
- sampleClient.connect(connOpts);
- //每个客户端最多允许存在30个订阅关系,超出限制可能会丢弃导致收不到部分消息
- sampleClient.subscribe(topicFilters,qos);
- Thread.sleep(Integer.MAX_VALUE);
- } catch (Exception me) {
- me.printStackTrace();
- }
- }
- }
上文代码用到的工具类 MacSignature.java 如下:
- public class MacSignature {
- /**
- * @param text 要签名的文本
- * @param secretKey 阿里云MQ SecretKey
- * @return 加密后的字符串
- * @throws InvalidKeyException
- * @throws NoSuchAlgorithmException
- */
- public static String macSignature(String text, String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
- Charset charset = Charset.forName("UTF-8");
- String algorithm = "HmacSHA1";
- Mac mac = Mac.getInstance(algorithm);
- mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
- byte[] bytes = mac.doFinal(text.getBytes(charset));
- return new String(Base64.encodeBase64(bytes), charset);
- }
- /**
- * 发送方签名方法
- *
- * @param clientId MQTT ClientID
- * @param secretKey 阿里云MQ SecretKey
- * @return 加密后的字符串
- * @throws NoSuchAlgorithmException
- * @throws InvalidKeyException
- */
- public static String publishSignature(String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
- return macSignature(clientId, secretKey);
- }
- /**
- * 订阅方签名方法
- *
- * @param topics 要订阅的Topic集合
- * @param clientId MQTT ClientID
- * @param secretKey 阿里云MQ SecretKey
- * @return 加密后的字符串
- * @throws NoSuchAlgorithmException
- * @throws InvalidKeyException
- */
- public static String subSignature(List<String> topics, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
- Collections.sort(topics); //以字典顺序排序
- String topicText = "";
- for (String topic : topics) {
- topicText += topic + "\n";
- }
- String text = topicText + clientId;
- return macSignature(text, secretKey);
- }
- /**
- * 订阅方签名方法
- *
- * @param topic 要订阅的Topic
- * @param clientId MQTT ClientID
- * @param secretKey 阿里云MQ SecretKey
- * @return 加密后的字符串
- * @throws NoSuchAlgorithmException
- * @throws InvalidKeyException
- */
- public static String subSignature(String topic, String clientId, String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
- List<String> topics = new ArrayList<String>();
- topics.add(topic);
- return subSignature(topics, clientId, secretKey);
- }
- }