阿里云微服务消息队列(MQTT For IoT)使用Demo

简介: 微消息队列 MQTT 版是阿里云推出的一款面向移动互联网以及物联网领域的轻量级消息中间件。如果说传统的消息队列中间件一般应用于微服务之间,那么适用于物联网的微消息队列 MQTT 版则实现了端与云之间的消息传递和真正意义上的万物互联。本文结合最新推出的V3版本实例介绍产品的具体使用流程。

Step By Step

服务使用大图

图片.png

  • 1、不同设备通过SDK和平台侧建立连接,实现设备与平台侧的交互通信;
  • 2、通过规则流转功能,将设备上报的消息流转到MQ Topic,也可以通过MQ Topic向MQTT Topic下发消息;
  • 3、基于Server端管控API,实现消息的直接下发、设备在线状态查询以及Group的创建。
一、MQTT Server创建(公网区域)

1、创建实例
图片.png

图片.png

2、Topic和Group创建
图片.png

图片.png

图片.png

二、设备端Java Code Sample

1、pom.xml

 <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
        </dependencies>

2、Device Code Sample

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {

    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT 实例 ID,购买后控制台获取
         */
        String instanceId = "post-cn-n6w*********";
        /**
         * 接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。
         */
        String endPoint = "post-cn-n6w********.mqtt.aliyuncs.com";
        /**
         * 账号 accesskey,从账号系统控制台获取
         */
        String accessKey = "LTAIOZZg********";
        /**
         * 账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置
         */
        String secretKey = "v7CjUJCMk7j9aK****************";
        /**
         * MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
         * clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
         */
        String clientId = "GID_MQTT_Client1@@@device1";
        /**
         * MQ4IOT 消息的一级 topic,需要在控制台申请才能使用。
         * 如果使用了没有申请或者没有被授权的 topic 会导致鉴权失败,服务端会断开客户端连接。
         */
        final String parentTopic = "MQTT_Topic";
        /**
         * MQ4IOT支持子级 topic,用来做自定义的过滤,此处为示意,可以填写任何字符串,具体参考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         * 需要注意的是,完整的 topic 长度不得超过128个字符。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置,具体参考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
         * 如果是 SSL 加密则设置ssl://endpoint:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客户端设置好发送超时时间,防止无限阻塞
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅需要的 topic
                 */
                System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制
                 * https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
                 */
                System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 1; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  发送普通消息时,topic 必须和接收方订阅的 topic 一致,或者符合通配符匹配规则
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * MQ4IoT支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的 clientId,则可以直接发送点对点消息。
             * 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的 topic 格式规范是  {{parentTopic}}/p2p/{{targetClientId}}
             */
            final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

工具类:util

3、测试效果
图片.png

4、消息流转轨迹查询
图片.png

三、规则流转测试

1、MQ创建三个不同类型的Topic
图片.png

2、创建Group,用于MQ侧消费消息
图片.png

3、MQTT侧配置流转规则
图片.png

图片.png

图片.png

图片.png

4、MQ侧代码测试

4.1 pom.xml

        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.7.1.Final</version>
        </dependency>

4.2 Code Sample

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在控制台创建的 Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "GID_MessageConsumer");
        // AccessKey ID 阿里云身份验证,在阿里云 RAM 控制台创建。
        properties.put(PropertyKeyConst.AccessKey, "LTAIOZZg********");
        // Accesskey Secret 阿里云身份验证,在阿里云服 RAM 控制台创建。
        properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
        // 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协议客户端接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_***************_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
        // 集群订阅方式 (默认)。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 广播订阅方式。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        //1、订阅设备上行消息
        consumer.subscribe("MessageFromMQTT", "*", new MessageListener() { //订阅多个 Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        //2、订阅设备上下线消息
        consumer.subscribe("DevcieOnlineAndOffline", "*", new MessageListener() { //订阅全部 Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
}

4.3 测试效果(先启动消费端,然后设备端上行消息)

图片.png

4.4 通过MQ发送消息到MQTT

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;

public class SendMQMessageToMQTT {

    public static void main(String[] args) {

            Properties properties = new Properties();
            // AccessKeyId 阿里云身份验证,在阿里云用户信息管理控制台获取。
            properties.put(PropertyKeyConst.AccessKey,"LTAIOZZg**********");
            // AccessKeySecret 阿里云身份验证,在阿里云用户信息管理控制台获取。
            properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
            //设置发送超时时间,单位毫秒。
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协议客户端接入点区域查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_********_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
            Producer producer = ONSFactory.createProducer(properties);

            // mqttSecondTopic:https://help.aliyun.com/document_detail/112971.html?spm=a2c4g.11186623.6.579.403242ca4pOcpC
            properties.put("mqttSecondTopic","testMq4Iot");

            // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可。
            producer.start();

            //循环发送消息。
            for (int i = 0; i < 1; i++){
                Message msg = new Message("MessageToMQTT","","MQ Message To MQTT".getBytes());
                msg.setKey("ORDERID_" + i);
                msg.setUserProperties(properties);

                try {
                    SendResult sendResult = producer.send(msg);
                    // 同步发送消息,只要不抛异常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                }
                catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }

            // 在应用退出前,销毁 Producer 对象。
            // 注意:如果不销毁也没有问题。
            producer.shutdown();
        }

    }

4.5 The Result

图片.png

图片.png

4.6 消息轨迹查询

图片.png

四、MQTT云端API测试

1、pom.xml

        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.6</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.4</version>
        </dependency>

2、发送消息Code Sample

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;

public class SendMessage {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
        IAcsClient client = new DefaultAcsClient(profile);

        SendMessageRequest request = new SendMessageRequest();
        request.setRegionId("mq-internet-access");
        request.setInstanceId("post-cn-n6w********");
        request.setPayload("message from manager api!");
        request.setMqttTopic("MQTT_Topic/testMq4Iot");

        try {
            SendMessageResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }
    }
}

3、测试效果
图片.png
图片.png

4、查询设备状态Code Sample

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;

public class QuerySessionByClientId {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
        IAcsClient client = new DefaultAcsClient(profile);

        QuerySessionByClientIdRequest request = new QuerySessionByClientIdRequest();
        request.setRegionId("mq-internet-access");
        request.setInstanceId("post-cn-n6w********");
        request.setClientId("GID_MQTT_Client1@@@device1");

        try {
            QuerySessionByClientIdResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}

5、测试效果
图片.png

更多参考

QuerySessionByClientId
快速使用 MQTT 的 Java SDK 收发消息(跨产品数据流入)
MQ发送普通消息
MQ订阅消息
阿里云微服务消息队列Token C# 设备端示例Demo
阿里云微服务消息队列Token Java Code Sample
阿里云微服务消息队列Token C# Code Sample

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 DataWorks 物联网
MQTT问题之接入阿里云物联网平台如何解决
MQTT接入是指将设备或应用通过MQTT协议接入到消息服务器,以实现数据的发布和订阅;本合集着眼于MQTT接入的流程、配置指导以及常见接入问题的解决方法,帮助用户实现稳定可靠的消息交换。
116 1
|
1月前
|
消息中间件 网络协议 物联网
MQTT协议问题之阿里云物联网服务器断开如何解决
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
97 1
|
4月前
|
消息中间件 网络协议 物联网
Golang微服务框架Kratos应用MQTT消息队列
MQTT 协议 是由`IBM`的`Andy Stanford-Clark博士`和`Arcom`(已更名为Eurotech)的`Arlen Nipper博士`于 1999 年发明,用于石油和天然气行业。工程师需要一种协议来实现最小带宽和最小电池损耗,以通过卫星监控石油管道。最初,该协议被称为消息队列遥测传输,得名于首先支持其初始阶段的 IBM 产品 MQ 系列。2010 年,IBM 发布了 MQTT 3.1 作为任何人都可以实施的免费开放协议,然后于 2013 年将其提交给结构化信息标准促进组织 (OASIS) 规范机构进行维护。2019 年,OASIS 发布了升级的 MQTT 版本 5。
33 0
|
4月前
|
XML 消息中间件 传感器
HTTP 与 MQTT:为您的 IoT 项目选择最佳协议
HTTP 与 MQTT:为您的 IoT 项目选择最佳协议
145 2
|
6月前
|
消息中间件 存储
MQTT 与消息队列的区别
MQTT 与消息队列的区别
168 1
|
6月前
|
编解码 小程序 JavaScript
阿里云IoT小程序应用开发和组件实践
通过实验,了解阿里云IoT小程序的应用开发的方法,了解其内置的基础组件使用,以及基于Vue.js实现可复用的自定义组件的方法。
304 1
|
6月前
|
运维 安全 物联网
使用阿里云 IoT 安全中心保护智慧遥控器
在物联网领域中,我们的 TO B 智慧设备,在发货之后,出现了不少困扰我们的安全问题,比如会被恶意安装应用,访问非法网站等,增加厂家的运维成本。 同时设备上的一些技术机密也容易被好事之人破解,对厂商构成商业损失,直到我们发现了阿里云物联网的一款安全防护产品 -- IoT 安全中心。它主打的 ID² 和安全运营有效的解决了我们的痛点。
380 3
|
7月前
|
传感器 监控 物联网
阿里云IoT HaaS 510:快速实现物联网数据传输的利器
众所周知,物联网(IoT)是近年来日益热门的技术领域之一,它的广泛应用为人们的生活和工作带来了无限可能。在物联网应用中,数据的采集和传输是至关重要的一环。DTU是一种应用于物联网数据传输的终端设备,它可以将各类传感器、数据采集单元等通过串口RS232/485传输到DTU,再由DTU转发到4G网络上传至云端。阿里云IoT HaaS 510是一款开板式DTU产品,能够帮助企业快速搭建物联网平台,并实现数据的采集和传输,那么本文就来简单分享一下。
334 1
阿里云IoT HaaS 510:快速实现物联网数据传输的利器
|
8月前
|
存储 安全 数据可视化
阿里云mqtt简介和优惠购买流程
MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,它可以在不同的设备和系统之间传递信息。阿里云是中国市场主流的云计算服务提供商,它提供了MQTT服务来支持IoT(Internet of Things)设备的通信。
|
9月前
|
自然语言处理 算法 物联网
阿里云正式发布「IoT消费电子应用引擎解决方案」,应用开发提效70%
阿里云正式发布「IoT消费电子应用引擎解决方案」,应用开发提效70%
201 0

相关产品

  • 云消息队列 MQ