
C/C++ 物联网 消息推送 iOS Qt MFC Linux Gtk 智能家居 IPC
原理 1、什么是域名?为什么要绑定域名?引用原话 域名(英文名字叫bai做domain name),也du叫做网域,实际zhi上就是一串在因特网计dao算机上zhuan的名字字符,而且是具有唯shu一性的,当传输数据时用做定位标识。域名是以英文符号“."来隔开,顶级域名是固定的,如.com/.cn/.net等等。在购买域名的时候选择自己想要的顶级域名,然后再从顶级域名左边输入自己想要注册的二级域名,组成一个完整的域名。 域名就相当于一个标识,也就是人的一个名字,人如果没有名字,让别人去记住一个人的话,就很困难了,名字也就这样出现了,而域名同理,就是一个网站的名字, 空间,就是一个人的躯体。 域名是用自己设定的字母,以及加上后缀名如:.com .cn .cnt .gov 等等来构成一串域名,一般常用com cn等域名,更具有权威性让人信服,而gov是政府网站,一般普通企业是申请不到的。 所以域名,不仅是一种标识,一种身份,而且访问起来更方便,让人更容易记住。 2、什么是域名解析?为什么要进行域名解析? 域名解析就是国际域名或者国内域名以及中文域名等域名申请后做的到IP地址的转换过程。IP地址是网路上标识您站点的数字地址,为了简单好记,采用域名来代替ip地址标识站点地址。域名的解析工作由DNS服务器完成。 DNS,就是Domain Name System的缩写,翻译过来就是域名系统,是互联网上作为域名和IP地址相互映射的一个分布式数据库。DNS能够使用户更方便的访问互联网,而不用去记住能够被机器直接读取的IP数串。通过域名,最终得到该域名对应的IP地址的过程叫做域名解析(或主机名解析) 。 解析过程,比如,一个域名为:abc.com,是想看到这个现HTTP服务,如果要访问网站,就要进行解析,首先在域名注册商那里通过专门的DNS服务器解析到一个WEB服务器的一个固定IP上:211.214.1.XXX,然后,通过WEB服务器来接收这个域名,把abc.com这个域名映射到这台服务器上。那么,输入abc.com这个域名就可以实现访问网站内容了.即实现了域名解析的全过程; DNS 数据库中包含的资源记录 (RR)。 每个 RR 标识数据库中的特定资源。我们在建立DNS服务器时,经常会用到SOA,NS,A之类的记录,在维护DNS服务器时,会用到MX,CNAME记录。 所以只有通过域名解析才能把域名转换成IP地址,才能够在网络上进行通信 3、CAME解析又是什么?CAME解析实际是一种DNS需要用到的一种资源记录。DNS将域名转换为IP,需要读取DNS数据库中的资源记录进行翻译。这些资源记录有多种翻译方式,如直接将域名翻译成IP的A记录、以及将域名翻译成另外一个域名的CAME记录等等 CNAME其实是DNS的别名记录,别名指向。可以为一个主机设置别名。比如可以理解为一个跳转。例如,域名www.abc.com 对应的真实源站IP为1.1.1.1,对应的CNAME为abcde12345.mozheanquanddos.com。或者简单来说就是允许多个名字映射到另一个域名,用于同时提供www和MAIL服务的计算机。说明:CNAME的目标主机地址只能使用主机名,不能使用IP地址;主机名前不能有任何其他前缀,如:http://等是不被允许的。CNAME对应真实的IP是不需要配置的,客户端会自动查询这个CNAME记录,最终得到一个IP(1.1.1.1)。 Step By Step 1、发布应用,点击绑定域名按钮,进入域名管理界面 域名管理页面 这里可以看到域名绑定的相关流程把这里的链接记录下来,待会要用到。 a120h1sf2cljmvso.vapp.cloudhost.link 2、添加CAME解析记录进入云解析DNS控制台页面https://dns.console.aliyun.com/?spm=a2c4g.11186623.2.22.190a7fdegUjM3W#/dns/domainList 选择或者添加备案成功过的域名,点击解析设备 进入解析设置界面,添加解析记录 点击添加记录 完成记录添加,点击启用 3、绑定域名 绑定成功 4、等待30分钟域名生效。 测试结果 一定要等半个小时,待域名生效。 1、ping yqbtest.pier39.cn可以看到CMANE记录域名成功解析至A记录的域名,A记录成功解析为IP地址 2、直接登录yqbtest.pier39.cn
1、 云端API介绍以及调试方法 https://help.aliyun.com/document_detail/69893.html?spm=a2c4g.11186623.6.749.1def3112gGOTHP 使用云端API之前务必先使用OpenAPI Explorer在线调试工具先调试通过。https://api.aliyun.com/?spm=a2c4g.11186623.2.12.240a3d29r9yc1o#/?product=Iot&version=2018-01-20&api=CreateProduct&tab=DEMO&lang=JAVA 其实我们的代码也就是从旁边的示例代码中搬运过来的。 2、使用common SDK调用云端API 前边已经说过,使用OpenAPI Explorer在线调试工具调试好之后,然后将示例代码放到自己的工程中以创建产品为例: 调试通过,然后就可以放心把代码搬出来了可以看到控制台上已经有新产品了。 编写代码 测试结果:控制台上也新建了产品源码:pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>CallAPI</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.3</version> </dependency> </dependencies> </project> CallAPI.Java package com.alibaba; import com.aliyuncs.CommonRequest; import com.aliyuncs.CommonResponse; import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.http.MethodType; import com.aliyuncs.profile.DefaultProfile; /* pom.xml <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.3</version> </dependency> */ public class CallAPI { public static void main(String[] args) { DefaultProfile profile = DefaultProfile.getProfile("cn-shanghai", "LTAI4*******vJdtXvT3G", "Mp2f4q*****9ud7rOPGl"); IAcsClient client = new DefaultAcsClient(profile); CommonRequest request = new CommonRequest(); request.setSysMethod(MethodType.POST); request.setSysDomain("iot.cn-shanghai.aliyuncs.com"); request.setSysVersion("2018-01-20"); request.setSysAction("CreateProduct"); request.putQueryParameter("RegionId", "cn-shanghai"); request.putQueryParameter("NodeType", "0"); request.putQueryParameter("ProductName", "ProductAPITestDemo6"); request.putQueryParameter("DataFormat", "1"); request.putQueryParameter("Description", "测试云端API用"); request.putQueryParameter("AliyunCommodityCode", "iothub_senior"); request.putQueryParameter("AuthType", "secret"); try { CommonResponse response = client.getCommonResponse(request); System.out.println(response.getData()); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); } } } 3、使用iot SDK调用云端API 到官网下载Demo点击链接后,进入Github(没有账户就注册一个) 导入到工程中同样以创建产品为例,和common SDK一样要找到我们需要修改的参数AK SK等 发现调用到了ProductManager继续追踪 测试结果: 控制台上创建新产品成功 源码直接去官网下载
业务场景 1、设备端主动上报物模型属性,物联网平台更新物模型属性值2、前端下发更新物模型属性指令,物模型平台不更新属性值,平台将临时的物模型属性值再转发到设备端。3、云平台自发向设备端下发更新物模型属性指令(云平台流转->下发物模型属性),物模型平台不更新属性值。4、设备端从物模型接收到临时的物模型属性值后进行其业务逻辑处理。5、前端获取物模型属性值,物模型平台给真实的物模型属性值。6、服务端获取物模型属性值,物模型平台给真实的物模型属性值。 原理 1、物模型属性的真实值来源只有一个,设备端主动上报 2、物联网平台只有设备端主动上报才会更新真实的物模型属性值 剖析 1、为什么物模型真实来源只能是设备端上报,下发更新物模型属性指令为什么都不行?因为阿里云平台物模型属性的更新机制只认设备上报的属性,这又是为什么呢? (1)设备上报的属性肯定是体现了设备当前真正的状态(2)指令下发也有同步和异步的,并不能保证指令一下发,设备的真实状态就发生了变化,这样物模型属性就和设备真实的状态不同步了。 (3)设备端可能由于种种原因,或者其业务逻辑限制,指令下发下来并不代表一定会生效,并不代表一定要按照指令去执行(4)指令下发过程中指令也可能被丢失,由于网络波动,资源紧张等情况,指令被舍弃。(5)有可能同时存在多个客户端同时下发指令到设备,那到底谁才是正确的呢?那当然是设备端自己主动上报的那个咯。 2、什么样的物模型消息才是正确的呢?先看协议格式: 日志服务中有物模型才能代表物模型数据更新物模型消息只能代表向物模型topic发送了数据,不一定是正确的 来看下错误的格式: 正确的格式: 从这里可以看到,time字段和value字段是可选的,也就是物模型消息的时间戳可以有,也可以没有,但是就是不能没有Params字段 3、Payload格式注意事项 A、Payload格式必须为json格式 Payload上报数据会经过哪些流程? (1) 首先,如果集成了SDK,会先经过SDK的校验。如果没有集成SDK或者直接使用MQTT接口进行上报 那么会先经过平台的 《物模型消息》这一关,如果不是Json格式: (2)然后是《物模型check》这一关,虽然满足了json格式,但是如果不满足Alink Json协议 (3)《物模型check》如果满足Alink Json协议,但是部分数据不满足物模型定义AMQP可以正常订阅,物模型可以正常更新数据,但是物模型数据的日志没有生成。(后来验证又有一条错误日志,感觉平台还是有bug) AMQP: 经过后来多次验证,发现《物模型check》还是有个错误日志的,平台6332也没有定义这个错误码 (4)《物模型check》如果满足Alink Json协议,但是全部数据都不满足物模型定义物模型数据不会更新,AMQP正常订阅消息,但是消息内容里面有俩条5092的错误码日志中物模型check这一关会提示6332错误码,tsl parse failed (5)《物模型check》如果满足Alink Json协议,也满足物模型定义物模型check通过,物模型数据更新正常,AMQP消息订阅正常,内容正常物模型check通过 物模型数据 AMQP: B、带time的格式和不带time的格式都是可以成功上报的 不带time格式: "{"params":{"Temperature":99.99,"BatteryPercentage":88.88}}"; 带time格式 "{"params":{"Temperature":{"value":75.75,"time":1603248612000}}}"; C、如果上报的时候自带time,那么控制台上最新的数据按照控制台的时间排布,但是历史数据就是按照自带的time的数据eg:测试时间:2020-10-24 14:46:50验证:当上报数据time写死1603248612000 (2020/10/21 10:50:12){"params":{"Temperature":{"value":75.75,"time":1603248612000}}}历史数据: 但是最新数据也是他: AMQP订阅到的消息: 顺便再看看日志记录里是什么样: D、如果上报数据的时候不带time 历史数据就是最新数据 看看AMQP收到的消息: 再看看日志: 4、通过云平台流转至物模型属性下发怎样才能生效? 物模型属性的真实值来源只有一个,设备端主动上报。所以不管是云端下发指令,还是平台转发指令(如云平台流转)等,设备端需要接收到指令的后,再将该属性进行上报 小结 本片文章只是剖析,不讲实操,把原理讲通 1、物模型属性的真实值来源只有一个,设备端主动上报2、物联网平台只有设备端主动上报才会更新真实的物模型属性值3、Payload注意事项4、物模型消息的正确格式 后续再逐步更新实操文档: 《阿里云物联网平台物模型属性下发如何才能生效》《阿里云物联网平台通过云平台流转实现设备间物模型属性的同步》《阿里云物联网云平台物模型流转SQL编写技巧》
业务场景 1、子设备与子设备、网关与子设备之间通过业务协议进行通信(ZigBee、wifi、蓝牙等)2、网关设备连接物联网平台,创建物理通道,并代理子设备上线创建逻辑通道3、子设备通过逻辑通道与物联网平台进行通信,但是实际不建立任何物理上的连接。 原理介绍1、子设备上报物模型,首先得满足网关-子设备架构,先要创建网关和子设备产品 关于网关产品和子设备的产品创建这里就不过多赘述。物模型上报,首先肯定得有物模型的定义吧。 2、子设备和网关要建立一种关联关系,建立网关与子设备的topo结构。(这样网关和平台才知道,谁是这个网关的子设备)(1)可以直接在控制台上进行操作 (2)可以通过云端API进行操作 (3)可以通过SDK的接口进行操作 C-SDK 4.x 接口 components/subdev/aiot_subdev_api.h 3、网关设备创建与物联网平台的链路,再代理子设备进行上线(注意:这些操作都是在网关上完成) 网关设备如同普通直连设备一样,与物联网平台建立连接,即网关-子设备架构中的物理通道 代理子设备上线,帮助子设备与物联网平台建立逻辑通道 components/subdev/aiot_subdev_api.h 4、子设备通过网关与物联网平台的物理链路上报消息到物联网平台(注意:这些操作都是在网关上完成,子设备只需要提供三元组信息) *不管是MQTT API还是物模型API,他们的底层原理都是一样的,网关设备和物联网平台进行直连,存在唯一的一条物理通道,而子设备通过网关代理上线,使用逻辑通道与平台进行通信,复用的是网关的物理通道 * 官网说明:可以看到这里有两种方式,分别是基于MQTT API的接口和基于物模型API的接口。 那么aiot_mqtt_pub和aiot_dm_send又是如何体现物理通道和逻辑通道呢?(1) MQTT API aiot_mqtt_pub()接口方式 core/aiot_mqtt_api.h 直接调用aiot_mqtt_pub传入带子设备信息的Topic (2) 物模型API aiot_dm_send()接口方式在aiot_dm_send()的两个参数中handle使用网关代理上线后的那个handle,msg携带子设备的信息,完成逻辑通道的消息传输。 components/data-model/aiot_dm_api.h 这个接口里面优先使用的是msg里面携带的product key和devicename来组装topic这个msg结果如下aiot_dm_msg_t 操作步骤1、创建网关产品和子设备的产品&&设备,并建立Topo结构,这里使用控制台方式方便快捷(此篇文章主要介绍子设备的物模型上报) 子设备产品定义一个测试用的物模型属性 2、打开C-SDK 4.x Demos demos/subdev_basic_demo.c (1) 修改网关设备三元组信息以及url在demo_mqtt_start()函数中(2)修改子设备的三元组信息全局变量aiot_subdev_dev_t g_subdev[](3)添加demo_subdev_send_property_post接口,上报子设备物模型数据 增加头文件包含 编写demo_subdev_send_property_post接口 (4) 在main函数中,子设备代理上线成功之后,添加子设备上报物模型逻辑,调用demo_subdev_send_property_post接口 (5)在main函数中,在 物模型API上报子设备的物模型逻辑之后,再添加一个通过MQTT API接口上报物模型的逻辑 (6)在C-SDK 4.x主目录 make cd output./subdev-basic-demo 我们可以看到三个子设备的物模型数据都上报成功温度分别是0.9 、1.9、 2.9音量分别是88、89、90 控制台中查看物模型数据:子设备1 子设备2 子设备3 修改后的Demo代码源码:demos/subdev_basic_demo.c /* * 这个例程适用于`Linux`这类支持pthread的POSIX设备, 它演示了...... * * 需要用户关注或修改的部分, 已经用 TODO 在注释中标明 * */ #include <stdio.h> #include <string.h> #include <unistd.h> #include <pthread.h> #include "aiot_state_api.h" #include "aiot_sysdep_api.h" #include "aiot_mqtt_api.h" #include "aiot_subdev_api.h" #include "aiot_dm_api.h" /* 位于portfiles/aiot_port文件夹下的系统适配函数集合 */ extern aiot_sysdep_portfile_t g_aiot_sysdep_portfile; /* 位于external/ali_ca_cert.c中的服务器证书 */ extern const char *ali_ca_cert; static pthread_t g_mqtt_process_thread; static pthread_t g_mqtt_recv_thread; static uint8_t g_mqtt_process_thread_running = 0; static uint8_t g_mqtt_recv_thread_running = 0; //TODO:修改子设备三元组信息 aiot_subdev_dev_t g_subdev[] = { { "a1H****0mgG", "IoT******bDemo1", "e98c465******6dcb35cfa75", "prGp****OjLS7GI" }, { "a1H*****mgG", "IoTDe*****Demo2", "92d719*****67f65bfe4e4d895", "prG*****jLS7GI" }, { "a1H9wUf0mgG", "IoTDeviceSubDemo3", "b2d20a9bb7dbe34e97a4c56f47aed6ee", "prG*****LS7GI" } /* { "a13FN5TplKq", "subdev_basic_demo_01", "768XBgQwgOakz3K4uhOiLeeh9xjJQx6h", "y7GSILD480lBSsP8" }, { "a13FN5TplKq", "subdev_basic_demo_02", "iwTZrbjbgNVChfuJkihjE5asekoyKoYv", "y7GSILD480lBSsP8" }, { "a13FN5TplKq", "subdev_basic_demo_03", "fdutq35iKMYdcWWBuIINY26hsNhgFXWE", "y7GSILD480lBSsP8" }, { "a13FN5TplKq", "subdev_basic_demo_04", "HCKv50YqgwdKhy5cE0Vz4aydmK2ojPvr", "y7GSILD480lBSsP8" } */ }; /* TODO: 如果要关闭日志, 就把这个函数实现为空, 如果要减少日志, 可根据code选择不打印 * * 例如: [1577589489.033][LK-0317] subdev_basic_demo&a13FN5TplKq * * 上面这条日志的code就是0317(十六进制), code值的定义见core/aiot_state_api.h * */ /* 日志回调函数, SDK的日志会从这里输出 */ static int32_t demo_state_logcb(int32_t code, char *message) { printf("%s", message); return 0; } /* MQTT事件回调函数, 当网络连接/重连/断开时被触发, 事件定义见core/aiot_mqtt_api.h */ void demo_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata) { switch (event->type) { /* SDK因为用户调用了aiot_mqtt_connect()接口, 与mqtt服务器建立连接已成功 */ case AIOT_MQTTEVT_CONNECT: { printf("AIOT_MQTTEVT_CONNECT\n"); /* TODO: 处理SDK建连成功, 不可以在这里调用耗时较长的阻塞函数 */ } break; /* SDK因为网络状况被动断连后, 自动发起重连已成功 */ case AIOT_MQTTEVT_RECONNECT: { printf("AIOT_MQTTEVT_RECONNECT\n"); /* TODO: 处理SDK重连成功, 不可以在这里调用耗时较长的阻塞函数 */ } break; /* SDK因为网络的状况而被动断开了连接, network是底层读写失败, heartbeat是没有按预期得到服务端心跳应答 */ case AIOT_MQTTEVT_DISCONNECT: { char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") : ("heartbeat disconnect"); printf("AIOT_MQTTEVT_DISCONNECT: %s\n", cause); /* TODO: 处理SDK被动断连, 不可以在这里调用耗时较长的阻塞函数 */ } break; default: { } } } /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时, 且无对应用户回调处理时被调用 */ void demo_mqtt_default_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata) { switch (packet->type) { case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: { printf("heartbeat response\n"); /* TODO: 处理服务器对心跳的回应, 一般不处理 */ } break; case AIOT_MQTTRECV_SUB_ACK: { printf("suback, res: -0x%04X, packet id: %d, max qos: %d\n", -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos); /* TODO: 处理服务器对订阅请求的回应, 一般不处理 */ } break; case AIOT_MQTTRECV_PUB: { printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic); printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload); /* TODO: 处理服务器下发的业务报文 */ } break; case AIOT_MQTTRECV_PUB_ACK: { printf("puback, packet id: %d\n", packet->data.pub_ack.packet_id); /* TODO: 处理服务器对QoS1上报消息的回应, 一般不处理 */ } break; default: { } } } /* 执行aiot_mqtt_process的线程, 包含心跳发送和QoS1消息重发 */ void *demo_mqtt_process_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_process_thread_running) { res = aiot_mqtt_process(args); if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } return NULL; } /* 执行aiot_mqtt_recv的线程, 包含网络自动重连和从服务器收取MQTT消息 */ void *demo_mqtt_recv_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_recv_thread_running) { res = aiot_mqtt_recv(args); if (res < STATE_SUCCESS) { if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } } return NULL; } int32_t demo_mqtt_start(void **handle) { int32_t res = STATE_SUCCESS; void *mqtt_handle = NULL; char *url = "iot-as-mqtt.cn-shanghai.aliyuncs.com"; /* 阿里云平台上海站点的域名后缀 */ char host[100] = {0}; /* 用这个数组拼接设备连接的云平台站点全地址, 规则是 ${productKey}.iot-as-mqtt.cn-shanghai.aliyuncs.com */ uint16_t port = 443; /* 无论设备是否使用TLS连接阿里云平台, 目的端口都是443 */ aiot_sysdep_network_cred_t cred; /* 安全凭据结构体, 如果要用TLS, 这个结构体中配置CA证书等参数 */ /* TODO: 替换为自己设备的三元组 */ char *product_key = "a1L****pp"; char *device_name = "Io*****Demo1"; char *device_secret = "68b10******a4b052a197538"; /* 创建SDK的安全凭据, 用于建立TLS连接 */ memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t)); cred.option = AIOT_SYSDEP_NETWORK_CRED_SVRCERT_CA; /* 使用RSA证书校验MQTT服务端 */ cred.max_tls_fragment = 16384; /* 最大的分片长度为16K, 其它可选值还有4K, 2K, 1K, 0.5K */ cred.sni_enabled = 1; /* TLS建连时, 支持Server Name Indicator */ cred.x509_server_cert = ali_ca_cert; /* 用来验证MQTT服务端的RSA根证书 */ cred.x509_server_cert_len = strlen(ali_ca_cert); /* 用来验证MQTT服务端的RSA根证书长度 */ /* 创建1个MQTT客户端实例并内部初始化默认参数 */ mqtt_handle = aiot_mqtt_init(); if (mqtt_handle == NULL) { printf("aiot_mqtt_init failed\n"); return -1; } /* TODO: 如果以下代码不被注释, 则例程会用TCP而不是TLS连接云平台 */ /* { memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t)); cred.option = AIOT_SYSDEP_NETWORK_CRED_NONE; } */ snprintf(host, 100, "%s.%s", product_key, url); /* 配置MQTT服务器地址 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)host); /* 配置MQTT服务器端口 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port); /* 配置设备productKey */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key); /* 配置设备deviceName */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name); /* 配置设备deviceSecret */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret); /* 配置网络连接的安全凭据, 上面已经创建好了 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred); /* 配置MQTT默认消息接收回调函数 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler); /* 配置MQTT事件回调函数 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler); /* 与服务器建立MQTT连接 */ res = aiot_mqtt_connect(mqtt_handle); if (res < STATE_SUCCESS) { /* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */ aiot_mqtt_deinit(&mqtt_handle); printf("aiot_mqtt_connect failed: -0x%04X\n", -res); return -1; } /* 创建一个单独的线程, 专用于执行aiot_mqtt_process, 它会自动发送心跳保活, 以及重发QoS1的未应答报文 */ g_mqtt_process_thread_running = 1; res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_process_thread failed: %d\n", res); aiot_mqtt_deinit(&mqtt_handle); return -1; } /* 创建一个单独的线程用于执行aiot_mqtt_recv, 它会循环收取服务器下发的MQTT消息, 并在断线时自动重连 */ g_mqtt_recv_thread_running = 1; res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_recv_thread failed: %d\n", res); g_mqtt_process_thread_running = 0; pthread_join(g_mqtt_process_thread, NULL); aiot_mqtt_deinit(&mqtt_handle); return -1; } *handle = mqtt_handle; return 0; } int32_t demo_mqtt_stop(void **handle) { int32_t res = STATE_SUCCESS; void *mqtt_handle = NULL; mqtt_handle = *handle; g_mqtt_process_thread_running = 0; g_mqtt_recv_thread_running = 0; pthread_join(g_mqtt_process_thread, NULL); pthread_join(g_mqtt_recv_thread, NULL); /* 断开MQTT连接 */ res = aiot_mqtt_disconnect(mqtt_handle); if (res < STATE_SUCCESS) { aiot_mqtt_deinit(&mqtt_handle); printf("aiot_mqtt_disconnect failed: -0x%04X\n", -res); return -1; } /* 销毁MQTT实例 */ res = aiot_mqtt_deinit(&mqtt_handle); if (res < STATE_SUCCESS) { printf("aiot_mqtt_deinit failed: -0x%04X\n", -res); return -1; } return 0; } void demo_subdev_recv_handler(void *handle, const aiot_subdev_recv_t *packet, void *user_data) { switch (packet->type) { case AIOT_SUBDEVRECV_TOPO_ADD_REPLY: case AIOT_SUBDEVRECV_TOPO_DELETE_REPLY: case AIOT_SUBDEVRECV_TOPO_GET_REPLY: case AIOT_SUBDEVRECV_BATCH_LOGIN_REPLY: case AIOT_SUBDEVRECV_BATCH_LOGOUT_REPLY: case AIOT_SUBDEVRECV_SUB_REGISTER_REPLY: case AIOT_SUBDEVRECV_PRODUCT_REGISTER_REPLY: { printf("msgid : %d\n", packet->data.generic_reply.msg_id); printf("code : %d\n", packet->data.generic_reply.code); printf("product key : %s\n", packet->data.generic_reply.product_key); printf("device name : %s\n", packet->data.generic_reply.device_name); printf("message : %s\n", (packet->data.generic_reply.message == NULL)?("NULL"):(packet->data.generic_reply.message)); printf("data : %s\n", packet->data.generic_reply.data); } break; case AIOT_SUBDEVRECV_TOPO_CHANGE_NOTIFY: { printf("msgid : %d\n", packet->data.generic_notify.msg_id); printf("product key : %s\n", packet->data.generic_notify.product_key); printf("device name : %s\n", packet->data.generic_notify.device_name); printf("params : %s\n", packet->data.generic_notify.params); } break; default: { } } } /* 子设备物模型属性上报函数演示 */ int32_t demo_subdev_send_property_post(void *dm_handle, char *params, char *subdev_product_key,char *subdev_device_name) { aiot_dm_msg_t msg; memset(&msg, 0, sizeof(aiot_dm_msg_t)); msg.type = AIOT_DMMSG_PROPERTY_POST; msg.data.property_post.params = params; msg.product_key = subdev_product_key; msg.device_name = subdev_device_name; return aiot_dm_send(dm_handle, &msg); } int main(int argc, char *argv[]) { int32_t res = STATE_SUCCESS; void *mqtt_handle = NULL, *subdev_handle = NULL; /* 配置SDK的底层依赖 */ aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile); /* 配置SDK的日志输出 */ aiot_state_set_logcb(demo_state_logcb); res = demo_mqtt_start(&mqtt_handle); if (res < 0) { printf("demo_mqtt_start failed\n"); return -1; } subdev_handle = aiot_subdev_init(); if (subdev_handle == NULL) { printf("aiot_subdev_init failed\n"); demo_mqtt_stop(&mqtt_handle); return -1; } aiot_subdev_setopt(subdev_handle, AIOT_SUBDEVOPT_MQTT_HANDLE, mqtt_handle); aiot_subdev_setopt(subdev_handle, AIOT_SUBDEVOPT_RECV_HANDLER, demo_subdev_recv_handler); res = aiot_subdev_send_topo_add(subdev_handle, g_subdev, sizeof(g_subdev)/sizeof(aiot_subdev_dev_t)); if (res < STATE_SUCCESS) { printf("aiot_subdev_send_topo_add failed, res: -0x%04X\n", -res); aiot_subdev_deinit(&subdev_handle); demo_mqtt_stop(&mqtt_handle); return -1; } sleep(2); // aiot_subdev_send_topo_delete(subdev_handle, g_subdev, sizeof(g_subdev)/sizeof(aiot_subdev_dev_t)); // if (res < STATE_SUCCESS) { // printf("aiot_subdev_send_topo_delete failed, res: -0x%04X\n", -res); // aiot_subdev_deinit(&subdev_handle); // demo_mqtt_stop(&mqtt_handle); // return -1; // } // sleep(2); // aiot_subdev_send_topo_get(subdev_handle); // if (res < STATE_SUCCESS) { // printf("aiot_subdev_send_topo_get failed, res: -0x%04X\n", -res); // aiot_subdev_deinit(&subdev_handle); // demo_mqtt_stop(&mqtt_handle); // return -1; // } // sleep(2); // aiot_subdev_send_sub_register(subdev_handle, g_subdev, sizeof(g_subdev)/sizeof(aiot_subdev_dev_t)); // if (res < STATE_SUCCESS) { // printf("aiot_subdev_send_sub_register failed, res: -0x%04X\n", -res); // aiot_subdev_deinit(&subdev_handle); // demo_mqtt_stop(&mqtt_handle); // return -1; // } // sleep(2); // aiot_subdev_send_product_register(subdev_handle, g_subdev, sizeof(g_subdev)/sizeof(aiot_subdev_dev_t)); // if (res < STATE_SUCCESS) { // printf("aiot_subdev_send_product_register failed, res: -0x%04X\n", -res); // aiot_subdev_deinit(&subdev_handle); // demo_mqtt_stop(&mqtt_handle); // return -1; // } // sleep(2); aiot_subdev_send_batch_login(subdev_handle, g_subdev, sizeof(g_subdev)/sizeof(aiot_subdev_dev_t)); if (res < STATE_SUCCESS) { printf("aiot_subdev_send_batch_login failed, res: -0x%04X\n", -res); aiot_subdev_deinit(&subdev_handle); demo_mqtt_stop(&mqtt_handle); return -1; } sleep(2); // aiot_subdev_send_batch_logout(subdev_handle, g_subdev, sizeof(g_subdev)/sizeof(aiot_subdev_dev_t)); // if (res < STATE_SUCCESS) { // printf("aiot_subdev_send_batch_logout failed, res: -0x%04X\n", -res); // aiot_subdev_deinit(&subdev_handle); // demo_mqtt_stop(&mqtt_handle); // return -1; // } /* 子设备上报物模型属性 基于物模型API接口*/ for(int i = 0; i < sizeof(g_subdev)/sizeof(aiot_subdev_dev_t) ; i++) { /* *********************************************************************************** * 替换为子设备的ProductKey 和 子设备的deviceName */ char *product_key = g_subdev[i].product_key; char *device_name = g_subdev[i].device_name; static float cnt = 0.9; char property_payload[30] = {0}; snprintf(property_payload, sizeof(property_payload), "{\"Temperature\": %f}", cnt++); //HAL_Snprintf(property_payload, sizeof(property_payload), "{\"Temperature\": {\"value\": %d,\"time\":1603248304}}", cnt++); //snprintf(property_payload, sizeof(property_payload), "{\"Temperature\": {\"value\": 77.77,\"time\":1603248304000}}"); demo_subdev_send_property_post(subdev_handle,property_payload,product_key,device_name); //demo_subdev_send_property_post(mqtt_handle,property_payload,product_key,device_name); /*********************************************************************************************/ sleep(1); } /* 子设备上报物模型属性 基于MQTT API接口*/ for(int i = 0; i < sizeof(g_subdev)/sizeof(aiot_subdev_dev_t) ; i++) { /* *********************************************************************************** * 替换为子设备的ProductKey 和 子设备的deviceName */ char *product_key2 = g_subdev[i].product_key; char *device_name2 = g_subdev[i].device_name; static int cnt2 = 88; char property_payload2[30] = {0}; snprintf(property_payload2, sizeof(property_payload2), "{\"params\":{\"Volume\": %d}}", cnt2++); //HAL_Snprintf(property_payload, sizeof(property_payload), "{\"Temperature\": {\"value\": %d,\"time\":1603248304}}", cnt++); //snprintf(property_payload, sizeof(property_payload), "{\"Temperature\": {\"value\": 77.77,\"time\":1603248304000}}"); char pub_topic[128] = {0}; snprintf(pub_topic,sizeof(pub_topic),"/sys/%s/%s/thing/event/property/post",product_key2,device_name2); res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)property_payload2, (uint32_t)strlen(property_payload2), 0); //res = aiot_mqtt_pub(subdev_handle, pub_topic, (uint8_t *)property_payload2, (uint32_t)strlen(property_payload2), 0); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res); return -1; } /*********************************************************************************************/ sleep(1); } res = aiot_subdev_deinit(&subdev_handle); if (res < STATE_SUCCESS) { printf("aiot_subdev_deinit failed: -0x%04X\n", res); } res = demo_mqtt_stop(&mqtt_handle); if (res < 0) { printf("demo_start_stop failed\n"); return -1; } return 0; }
业务场景 1、云端向设备发送的下行消息或者异步服务调用到平台也算结束,平台再向设备进行一个透传2、设备端向平台发送的上行消息到平台就算结束3、云端通过服务端订阅来获取设备上行的消息 原理1、要想获取异步服务调用的返回结果,首先设备得有返回(1) 设备接收平台端透传过来的异步服务调用 (2)设备端收到数据后进行响应2、这里的设备响应结果发送给平台之后,平台可通过云产品流传或服务端订阅,再将消息发送给云端方式一:云平台流转,注意选择Topic:/${productKey}/${deviceName}/thing/downlink/reply/message这个为什么可行?可参考官方文档说明然后添加操作为发布到AMQP消费组 方式二:AMQP服务端订阅订阅什么呢?勾选设备上报消息即可(前提是设备端有返回,即满足原理1的前提) 操作步骤不多说了,直接上步骤1、准备测试用的产品和设备主要是定义一个异步服务:这里不详细阐述 2、准备测试用的云端调试工具可以是集成云端SDK的Demo,可以是业务逻辑应用调用云端API,最简单的直接使用云端API在线调试工具具体参数填写规范,这里也不做详细阐述 3、物联网平台控制台上配置好规则引擎(1)云平台流转选择好产品设备和topic 注意SQL语句的编写,这里的字段就是要发送给AMQP客户端的消息内容,可以事先进行调试。 这里要注意AMQP客户端都是按照既定的协议格式进行过滤数据的,所以这里的消息内容需要按照协议进行配置 确定好消息内容后 SQL语句: SELECT timestamp('yyyy-MM-dd\'T\'HH:mm:ss\'Z\'') as 云平台流转至AMQP测试,deviceName() as deviceName, code as code,data as data,topic() as topic,messageId() as requestId,id as id,topic(1) as productKey,iotId as iotId FROM "/a16hDZJpRCl/IoTDeviceDemo1thing/downlink/reply/message" WHERE (2)服务端订阅勾选设备上报消息即可,具体消费组怎么创建就不详细阐述 实测效果: 4、设备端接收消息+响应reply 代码示例:pom.xml: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>MQTTClient</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>3.5.1</version> </dependency> </dependencies> </project> AliyunIoTSignUtil: package com.alibaba.taro; import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.crypto.spec.SecretKeySpec; import java.util.Arrays; import java.util.Map; /** * AliyunIoTSignUtil */ public class AliyunIoTSignUtil { public static String sign(Map<String, String> params, String deviceSecret, String signMethod) { //将参数Key按字典顺序排序 String[] sortedKeys = params.keySet().toArray(new String[] {}); Arrays.sort(sortedKeys); //生成规范化请求字符串 StringBuilder canonicalizedQueryString = new StringBuilder(); for (String key : sortedKeys) { if ("sign".equalsIgnoreCase(key)) { continue; } canonicalizedQueryString.append(key).append(params.get(key)); } try { String key = deviceSecret; return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key); } catch (Exception e) { throw new RuntimeException(e); } } /** * HMACSHA1加密 * */ public static String encryptHMAC(String signMethod,String content, String key) throws Exception { SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod); Mac mac = Mac.getInstance(secretKey.getAlgorithm()); mac.init(secretKey); byte[] data = mac.doFinal(content.getBytes("utf-8")); return bytesToHexString(data); } public static final String bytesToHexString(byte[] bArray) { StringBuffer sb = new StringBuffer(bArray.length); String sTemp; for (int i = 0; i < bArray.length; i++) { sTemp = Integer.toHexString(0xFF & bArray[i]); if (sTemp.length() < 2) { sb.append(0); } sb.append(sTemp.toUpperCase()); } return sb.toString(); } } Demo: package com.alibaba; import com.alibaba.taro.AliyunIoTSignUtil; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.json.JSONObject; import java.util.HashMap; import java.util.Map; public class CustomTopicMessageDemo2 { public static String productKey = "a16hD*****"; public static String deviceName = "IoTDevice****"; public static String deviceSecret = "0895205d*********"; public static String regionId = "cn-shanghai"; // 物模型-属性上报topic //private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post"; //private static String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/property/set"; // 自定义topic,在产品Topic列表位置定义 //private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/DemoTest"; //private static String subTopic = "/"+productKey + "/" + deviceName+"/user/DemoTest"; private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/get"; private static String subTopic = "/"+productKey + "/" + deviceName+"/user/get"; private static MqttClient mqttClient; public static void main(String [] args){ initAliyunIoTClient(); // ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1, // new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build()); // // scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS); // 汇报属性 //String payloadJson = "{\"params\":{\"MasterLightSwitch\":0,\"LivingLightSwitch\":0,\"SecondaryLightSwotch\":0,\"MasterCurtainSwitch\":1,\"SecondaryCurtainSwitch\":1,\"LivingCurtainSwitch\":1}}"; //String payloadJson = "{\"params\":{\"Temp\":77,\"yyy\":{\"tttt\":\"123\"}}}"; String payloadJson = "{\"params\":{\"Temp\":77,\"yyy\":\"8888\"}}"; //String payloadJson = "{\"tts\":\"ss\"}"; //String payloadJson = "34454545"; postDeviceProperties(payloadJson); try { mqttClient.subscribe(subTopic); // 订阅Topic } catch (MqttException e) { System.out.println("error:" + e.getMessage()); e.printStackTrace(); } // 设置订阅监听 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { System.out.println("connection Lost"); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { String payload = new String(mqttMessage.getPayload()); System.out.println(" 接收消息:"); System.out.println("Topic : " + s); System.out.println(payload); //打印输出消息payLoad System.out.println("================================================================="); // String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/property/set"; // if(s.equals(subTopic)) { // JSONObject jsonProperty = new JSONObject(payload); // if(jsonProperty.has("params")) // { // String paramsJson = jsonProperty.get("params").toString(); // System.out.println("test paramsJson is:\n" + paramsJson); // String params = "{\"params\": " + paramsJson + "}"; // System.out.println("test params is:\n" + params); // System.out.println("收到属性设置后,再上报一次属性:"); // postDeviceProperties(params); // } // } //收到服务调用,给予返回reply // 下行(Alink JSON): // 请求Topic:/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier} // 响应Topic:/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}_reply String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/StartP2PStreaming"; String replyTopic = "/sys/" + productKey + "/" + deviceName + "/thing/service/StartP2PStreaming_reply"; if(s.equals(subTopic)) { JSONObject jsonProperty = new JSONObject(payload); if(jsonProperty.has("id")) { String id = jsonProperty.get("id").toString(); String replyJson = "{\"data\":{},\"code\":200,\"id\":\""+ id +"\"}"; //System.out.println("test replyJson is:\n" + replyJson); //String replys = "{\"params\": " + replyJson + "}"; //System.out.println("test reply is:\n" + replys); System.out.println("收到服务调用后,给予返回"); postServiceReply(replyJson,replyTopic); } } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); } /** * 初始化 Client 对象 */ private static void initAliyunIoTClient() { try { // 构造连接需要的参数 String clientId = "java" + System.currentTimeMillis(); Map<String, String> params = new HashMap<String, String>(16); params.put("productKey", productKey); params.put("deviceName", deviceName); params.put("clientId", clientId); String timestamp = String.valueOf(System.currentTimeMillis()); params.put("timestamp", timestamp); // cn-shanghai String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883"; String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|"; String mqttUsername = deviceName + "&" + productKey; String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1"); connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword); } catch (Exception e) { System.out.println("initAliyunIoTClient error " + e.getMessage()); } } public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception { MemoryPersistence persistence = new MemoryPersistence(); mqttClient = new MqttClient(url, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); // MQTT 3.1.1 connOpts.setMqttVersion(4); connOpts.setAutomaticReconnect(false); connOpts.setCleanSession(false); //connOpts.setCleanSession(true); connOpts.setUserName(mqttUsername); connOpts.setPassword(mqttPassword.toCharArray()); connOpts.setKeepAliveInterval(60); mqttClient.connect(connOpts); } /** * 汇报属性 */ private static void postDeviceProperties(String payloadJson) { try { //上报数据 //高级版 物模型-属性上报payload System.out.println("上报属性值:"); //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}"; //String payloadJson = "{\"GeoLocation\":{\"Longitude\":120.99,\"Latitude\":30.13,\"Altitude\":39.01},\"BatteryPercentage\":40.703533, \"Temperature\":2.233362}"; //String payloadJson = "{\"id\":\"3\",\"version\":\"1.0\",\"params\":{\"GeoLocation\":{\"Longitude\":120.999,\"Latitude\":30.13,\"Altitude\":39.01},\"BatteryPercentage\":42.99999, \"Temperature\":2.233362}}"; //String payloadJson = "{\"params\":{\"MasterLightSwitch\":0,\"LivingLightSwitch\":0,\"SecondaryLightSwotch\":0,\"MasterCurtainSwitch\":1,\"SecondaryCurtainSwitch\":1,\"LivingCurtainSwitch\":1}}"; System.out.println(payloadJson); MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8")); message.setQos(0); mqttClient.publish(pubTopic, message); System.out.println("================================================================="); } catch (Exception e) { System.out.println(e.getMessage()); } } /** * 服务返回 */ private static void postServiceReply(String payloadJson,String relpyTopic) { try { //上报数据 //高级版 物模型-属性上报payload System.out.println("服务调用返回:"); //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}"; System.out.println("Topic:"); System.out.println(relpyTopic); System.out.println(payloadJson); MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8")); message.setQos(0); mqttClient.publish(relpyTopic, message); System.out.println("================================================================="); } catch (Exception e) { System.out.println(e.getMessage()); } } } 实测效果: 5、云端使用AMQP客户端登录,并接收消息 参考官方文档,这里就不作详细阐述。https://help.aliyun.com/document_detail/143601.html?spm=a2c4g.11186623.6.624.304e354e2OEGFh 代码示例:pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>Test</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- amqp 1.0 qpid client --> <!-- <dependency>--> <!-- <groupId>org.apache.qpid</groupId>--> <!-- <artifactId>qpid-jms-client</artifactId>--> <!-- <version>0.47.0</version>--> <!-- </dependency>--> <!-- https://mvnrepository.com/artifact/commons-codec/commons-codec --> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.qpid/qpid-jms-client --> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.47.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.maven.surefire</groupId> <artifactId>maven-surefire-common</artifactId> <version>2.12.4</version> </dependency> <!-- util for base64--> <!-- <dependency>--> <!-- <groupId>commons-codec</groupId>--> <!-- <artifactId>commons-codec</artifactId>--> <!-- <version>1.3</version>--> <!-- </dependency>--> </dependencies> </project> Demo package com.alibaba; import org.apache.commons.codec.binary.Base64; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionListener; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import java.net.URI; import java.util.Hashtable; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class AmqpJavaClientDemo { private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class); //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。 private final static ExecutorService executorService = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50000)); public static void main(String[] args) throws Exception { //参数说明,请参见AMQP客户端接入说明文档。 String accessKey = "LTAI4G2*****"; String accessSecret = "Mp2f4qopmULI6*****"; String consumerGroupId = "e0oRIYMSOYwQ*****"; //iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。 String iotInstanceId = ""; long timeStamp = System.currentTimeMillis(); //签名方法:支持hmacmd5、hmacsha1和hmacsha256。 String signMethod = "hmacsha1"; //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。 //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。 String clientId = "yangboClientId"; //userName组装方法,请参见AMQP客户端接入说明文档。 String userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod + ",timestamp=" + timeStamp + ",authId=" + accessKey + ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"; //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp; String password = doSign(signContent,accessSecret, signMethod); //接入域名,请参见AMQP客户端接入说明文档。 String connectionUrl = "failover:(amqps://1875496626634053.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)" + "?failover.reconnectDelay=30"; Hashtable<String, String> hashtable = new Hashtable<String, String>(); hashtable.put("connectionfactory.SBCF",connectionUrl); hashtable.put("queue.QUEUE", "default"); hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); Context context = new InitialContext(hashtable); ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF"); Destination queue = (Destination)context.lookup("QUEUE"); // Create Connection Connection connection = cf.createConnection(userName, password); ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener); System.out.println("connection success"); // Create Session // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。 // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); // Create Receiver Link MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(messageListener); } private static MessageListener messageListener = new MessageListener() { @Override public void onMessage(Message message) { try { //1.收到消息之后一定要ACK。 // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。 // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。 // message.acknowledge(); //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。 // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。 executorService.submit(new Runnable() { public void run() { processMessage(message); } }); } catch (Exception e) { logger.error("submit task occurs exception ", e); } } }; /** * 在这里处理您收到消息后的具体业务逻辑。 */ private static void processMessage(Message message) { try { byte[] body = message.getBody(byte[].class); String content = new String(body); String topic = message.getStringProperty("topic"); String messageId = message.getStringProperty("messageId"); System.out.println("receive message" + ", topic = " + topic + ", messageId = " + messageId + ", content = " + content); logger.info("receive message" + ", topic = " + topic + ", messageId = " + messageId + ", content = " + content); } catch (Exception e) { logger.error("processMessage occurs error ", e); } } private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() { /** * 连接成功建立。 */ @Override public void onConnectionEstablished(URI remoteURI) { logger.info("onConnectionEstablished, remoteUri:{}", remoteURI); } /** * 尝试过最大重试次数之后,最终连接失败。 */ @Override public void onConnectionFailure(Throwable error) { logger.error("onConnectionFailure, {}", error.getMessage()); } /** * 连接中断。 */ @Override public void onConnectionInterrupted(URI remoteURI) { logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI); } /** * 连接中断后又自动重连上。 */ @Override public void onConnectionRestored(URI remoteURI) { logger.info("onConnectionRestored, remoteUri:{}", remoteURI); } @Override public void onInboundMessage(JmsInboundMessageDispatch envelope) {} @Override public void onSessionClosed(Session session, Throwable cause) {} @Override public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {} @Override public void onProducerClosed(MessageProducer producer, Throwable cause) {} }; /** * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。 */ private static String doSign(String toSignString, String secret, String signMethod) throws Exception { SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod); Mac mac = Mac.getInstance(signMethod); mac.init(signingKey); byte[] rawHmac = mac.doFinal(toSignString.getBytes()); return Base64.encodeBase64String(rawHmac); // return Arrays.toString(Base64.encodeBase64(rawHmac)); } } 实测效果:云平台流转方式的返回结果: AMQP订阅方式返回结果:
2021年01月
2020年12月
2020年10月