阿里云物联网平台数据转发到函数计算示例

简介: 本文主要演示通过规则引擎将设备上行消息流转到函数计算,并通过函数计算发送消息到钉钉机器人。

作者:俏巴

概述


使用物联网平台规则引擎的数据流转功能,可将Topic中的数据消息转发至其他Topic或其他阿里云产品进行存储或处理。本文主要演示通过规则引擎将设备上行消息流转到函数计算,并通过函数计算发送消息到钉钉机器人。


Step By Step


产品及设备准备



1、创建产品
_


2、定义物模型
_


3、添加设备
_


_


4、使用SDK 上行消息,参考链接:基于开源JAVA MQTT Client连接阿里云IoT


import com.alibaba.taro.AliyunIoTSignUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class IoTDemoPubSubDemo {

<span class="hljs-comment">// 设备三元组信息</span>
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> productKey = <span class="hljs-string">"a16MX********"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> deviceName = <span class="hljs-string">"device1"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> deviceSecret = <span class="hljs-string">"YGLHxUr40E1JaWhk3IVAm0uk********"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> regionId = <span class="hljs-string">"cn-shanghai"</span>;

<span class="hljs-comment">// 物模型-属性上报topic</span>
private <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> pubTopic = <span class="hljs-string">"/sys/"</span> + productKey + <span class="hljs-string">"/"</span> + deviceName + <span class="hljs-string">"/thing/event/property/post"</span>;
<span class="hljs-comment">// 自定义topic,在产品Topic列表位置定义</span>
private <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> subTopic = <span class="hljs-string">"/sys/"</span> + productKey + <span class="hljs-string">"/"</span> + deviceName + <span class="hljs-string">"/thing/event/property/post_reply"</span>;

private <span class="hljs-keyword">static</span> MqttClient mqttClient;

public <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> main(<span class="hljs-built_in">String</span> [] args){

    initAliyunIoTClient();
    ScheduledExecutorService scheduledThreadPool = <span class="hljs-keyword">new</span> ScheduledThreadPoolExecutor(<span class="hljs-number">1</span>,
            <span class="hljs-keyword">new</span> ThreadFactoryBuilder().setNameFormat(<span class="hljs-string">"thread-runner-%d"</span>).build());

    scheduledThreadPool.scheduleAtFixedRate(()-&gt;postDeviceProperties(), <span class="hljs-number">10</span>,<span class="hljs-number">5</span>, TimeUnit.SECONDS);

    <span class="hljs-keyword">try</span> {
        mqttClient.subscribe(subTopic); <span class="hljs-comment">// 订阅Topic</span>
    } <span class="hljs-keyword">catch</span> (MqttException e) {
        System.out.println(<span class="hljs-string">"error:"</span> + e.getMessage());
        e.printStackTrace();
    }

    <span class="hljs-comment">// 设置订阅监听</span>
    mqttClient.setCallback(<span class="hljs-keyword">new</span> MqttCallback() {
        @Override
        public <span class="hljs-keyword">void</span> connectionLost(Throwable throwable) {
            System.out.println(<span class="hljs-string">"connection Lost"</span>);

        }

        @Override
        public <span class="hljs-keyword">void</span> messageArrived(<span class="hljs-built_in">String</span> s, MqttMessage mqttMessage) throws Exception {
            System.out.println(<span class="hljs-string">"Sub message"</span>);
            System.out.println(<span class="hljs-string">"Topic : "</span> + s);
            System.out.println(<span class="hljs-keyword">new</span> <span class="hljs-built_in">String</span>(mqttMessage.getPayload())); <span class="hljs-comment">//打印输出消息payLoad</span>
        }

        @Override
        public <span class="hljs-keyword">void</span> deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    });

}

<span class="hljs-comment">/**
 * 初始化 Client 对象
 */</span>
private <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> initAliyunIoTClient() {

    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">// 构造连接需要的参数</span>
        <span class="hljs-built_in">String</span> clientId = <span class="hljs-string">"java"</span> + System.currentTimeMillis();
        <span class="hljs-built_in">Map</span>&lt;<span class="hljs-built_in">String</span>, <span class="hljs-built_in">String</span>&gt; params = <span class="hljs-keyword">new</span> HashMap&lt;&gt;(<span class="hljs-number">16</span>);
        params.put(<span class="hljs-string">"productKey"</span>, productKey);
        params.put(<span class="hljs-string">"deviceName"</span>, deviceName);
        params.put(<span class="hljs-string">"clientId"</span>, clientId);
        <span class="hljs-built_in">String</span> timestamp = <span class="hljs-built_in">String</span>.valueOf(System.currentTimeMillis());
        params.put(<span class="hljs-string">"timestamp"</span>, timestamp);
        <span class="hljs-comment">// cn-shanghai</span>
        <span class="hljs-built_in">String</span> targetServer = <span class="hljs-string">"tcp://"</span> + productKey + <span class="hljs-string">".iot-as-mqtt."</span>+regionId+<span class="hljs-string">".aliyuncs.com:1883"</span>;

        <span class="hljs-built_in">String</span> mqttclientId = clientId + <span class="hljs-string">"|securemode=3,signmethod=hmacsha1,timestamp="</span> + timestamp + <span class="hljs-string">"|"</span>;
        <span class="hljs-built_in">String</span> mqttUsername = deviceName + <span class="hljs-string">"&amp;"</span> + productKey;
        <span class="hljs-built_in">String</span> mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, <span class="hljs-string">"hmacsha1"</span>);

        connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

    } <span class="hljs-keyword">catch</span> (Exception e) {
        System.out.println(<span class="hljs-string">"initAliyunIoTClient error "</span> + e.getMessage());
    }
}

public <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connectMqtt(<span class="hljs-built_in">String</span> url, <span class="hljs-built_in">String</span> clientId, <span class="hljs-built_in">String</span> mqttUsername, <span class="hljs-built_in">String</span> mqttPassword) throws Exception {

    MemoryPersistence persistence = <span class="hljs-keyword">new</span> MemoryPersistence();
    mqttClient = <span class="hljs-keyword">new</span> MqttClient(url, clientId, persistence);
    MqttConnectOptions connOpts = <span class="hljs-keyword">new</span> MqttConnectOptions();
    <span class="hljs-comment">// MQTT 3.1.1</span>
    connOpts.setMqttVersion(<span class="hljs-number">4</span>);
    connOpts.setAutomaticReconnect(<span class="hljs-literal">false</span>);

// connOpts.setCleanSession(true);

    connOpts.setCleanSession(<span class="hljs-literal">false</span>);

    connOpts.setUserName(mqttUsername);
    connOpts.setPassword(mqttPassword.toCharArray());
    connOpts.setKeepAliveInterval(<span class="hljs-number">60</span>);

    mqttClient.connect(connOpts);
}

<span class="hljs-comment">/**
 * 汇报属性
 */</span>
private <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> postDeviceProperties() {

    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">//上报数据</span>
        <span class="hljs-comment">//高级版 物模型-属性上报payload</span>
        System.out.println(<span class="hljs-string">"上报属性值"</span>);
        <span class="hljs-built_in">String</span> payloadJson = <span class="hljs-string">"{\"params\":{\"CurrentTemperature\":13,\"Humidity\":10}}"</span>;
        MqttMessage message = <span class="hljs-keyword">new</span> MqttMessage(payloadJson.getBytes(<span class="hljs-string">"utf-8"</span>));
        message.setQos(<span class="hljs-number">1</span>);
        mqttClient.publish(pubTopic, message);
    } <span class="hljs-keyword">catch</span> (Exception e) {
        System.out.println(e.getMessage());
    }
}

}


5、运行状态查看
_




函数计算创建与配置

1、创建应用
_


_


2、应用下面添加函数


_


_


3、编辑脚本


const https = require('https');

const accessToken = '填写accessToken,即钉钉机器人webhook的accessToken';
module.exports.handler = function(event, context, callback) {
var eventJson = JSON.parse(event.toString());
console.log(event.toString());
//钉钉消息格式
const postData = JSON.stringify({
"msgtype": "markdown",
"markdown": {
"title": "设备温湿度传感器",
"text": "#### 温湿度传感器上报n" +
"> 设备名称:" + eventJson.deviceName+ "nn" +
"> 实时温度:" + eventJson.Temperature + "℃nn" +
"> 相对湿度:" + eventJson.Humidity + "%nn" +
"> ###### " + eventJson.time + " 发布 by 物联网平台 n"
},
"at": {
"isAtAll": false
}
});
const options = {
hostname: 'oapi.dingtalk.com',
port: 443,
path: '/robot/send?access_token=' + accessToken,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData)
}
};
const req = https.request(options, (res) => {
res.setEncoding('utf8');
res.on('data', (chunk) => {});
res.on('end', () => {
callback(null, 'success');
});
});
// 异常返回
req.on('error', (e) => {
callback(e);
});
// 写入数据
req.write(postData);
req.end();
};


钉钉机器人webhook的accessToken获取参考链接:阿里云IoT Studio服务开发定时关灯功能示例Demo: 2.3 钉钉机器人Webhook获取 部分。


4、快速测试
_


_




规则引擎配置

1、创建规则引擎
_


2、配置处理数据


_


SQL字段


deviceName() as deviceName, items.Humidity.value as Humidity, items.CurrentTemperature.value as Temperature, timestamp('yyyy-MM-dd HH:mm:ss') as time

3、配置转发数据


_


4、启动设备端SDK,周期性上行消息,钉钉群查看通知


_


5、上行日志查看


_


参考链接


温湿度计上报数据到钉钉群机器人

相关实践学习
【AI破次元壁合照】少年白马醉春风,函数计算一键部署AI绘画平台
本次实验基于阿里云函数计算产品能力开发AI绘画平台,可让您实现“破次元壁”与角色合照,为角色换背景效果,用AI绘图技术绘出属于自己的少年江湖。
从 0 入门函数计算
在函数计算的架构中,开发者只需要编写业务代码,并监控业务运行情况就可以了。这将开发者从繁重的运维工作中解放出来,将精力投入到更有意义的开发任务上。
相关文章
|
2月前
|
机器学习/深度学习 人工智能 Serverless
吉利汽车携手阿里云函数计算,打造新一代 AI 座舱推理引擎
当前吉利汽车研究院人工智能团队承担了吉利汽车座舱 AI 智能化的方案建设,在和阿里云的合作中,基于星睿智算中心 2.0 的 23.5EFLOPS 强大算力,构建 AI 混合云架构,面向百万级用户的实时推理计算引入阿里云函数计算的 Serverless GPU 算力集群,共同为智能座舱的交互和娱乐功能提供大模型推理业务服务,涵盖的场景如针对模糊指令的复杂意图解析、文生图、情感 TTS 等。
|
3月前
|
消息中间件 运维 监控
爆款游戏背后:尚娱如何借助阿里云 Kafka Serverless 轻松驾驭“潮汐流量”?
阿里云 Kafka 不仅为尚娱提供了高可靠、低延迟的消息通道,更通过 Serverless 弹性架构实现了资源利用率和成本效益的双重优化,助力尚娱在快速迭代的游戏市场中实现敏捷运营、稳定交付与可持续增长。
216 35
|
3月前
|
消息中间件 安全 物联网
海量接入、毫秒响应:易易互联携手阿里云构筑高可用物联网消息中枢
面对换电生态高速发展的通信挑战,易易互联通过采用阿里云 MQTT + RocketMQ 的融合解决方案,成功构建了“海量接入、实时响应、弹性处理、安全可信”的物联网通信底座。该架构不仅显著提升了系统稳定性与可扩展性,更保障了高并发场景下的业务连续性,为实现“让换电成为营运补能第一选择”的战略目标提供了坚实的技术支撑。
218 37
|
3月前
|
人工智能 机器人 Serverless
安诺机器人 X 阿里云函数计算 AI 咖啡印花解决方案
当云计算遇见具身智能,AI咖啡开启零售新体验。用户通过手机生成个性化图像,云端AI快速渲染,机器人精准复刻于咖啡奶泡之上,90秒内完成一杯可饮用的艺术品。该方案融合阿里云FunctionAI生图能力与安诺机器人高精度执行系统,实现AIGC创意到实体呈现的闭环,为线下零售提供低成本、高互动、易部署的智能化升级路径,已在商场、机场、展馆等场景落地应用。
安诺机器人 X 阿里云函数计算 AI 咖啡印花解决方案
|
3月前
|
消息中间件 存储 运维
嘉银科技基于阿里云 Kafka Serverless 提升业务弹性能力,节省成本超过 20%
云消息队列 Kafka 版 Serverless 系列凭借其秒级弹性扩展、按需付费、轻运维的优势,助力嘉银科技业务系统实现灵活扩缩容,在业务效率和成本优化上持续取得突破,保证服务的敏捷性和稳定性,并节省超过 20% 的成本。
223 24
|
2月前
|
人工智能 运维 安全
阿里云函数计算 AgentRun 全新发布,构筑智能体时代的基础设施
阿里云推出以函数计算为核心的AgentRun平台,通过创新体系解决开发、部署、运维难题,提供全面支持,已在多个真实业务场景验证,是AI原生时代重要基础设施。
|
3月前
|
人工智能 运维 安全
阿里云函数计算 AgentRun 全新发布,构筑智能体时代的基础设施
云原生应用平台 Serverless 计算负责人杨皓然在云栖大会发表主题演讲“Serverless Agent 基础设施:助力大规模 Agent 部署与运维”。本议题深入介绍了阿里云以函数计算为核心打造的 Agent 基础设施——AgentRun,阐述其如何通过创新的运行时、模型服务、网关及可观测体系,为企业构筑坚实、高效、安全的 Agent 时代基石。
|
2月前
|
人工智能 运维 Kubernetes
Serverless 应用引擎 SAE:为传统应用托底,为 AI 创新加速
在容器技术持续演进与 AI 全面爆发的当下,企业既要稳健托管传统业务,又要高效落地 AI 创新,如何在复杂的基础设施与频繁的版本变化中保持敏捷、稳定与低成本,成了所有技术团队的共同挑战。阿里云 Serverless 应用引擎(SAE)正是为应对这一时代挑战而生的破局者,SAE 以“免运维、强稳定、极致降本”为核心,通过一站式的应用级托管能力,同时支撑传统应用与 AI 应用,让企业把更多精力投入到业务创新。
442 29
|
3月前
|
存储 人工智能 Serverless
函数计算进化之路:AI 应用运行时的状态剖析
AI应用正从“请求-响应”迈向“对话式智能体”,推动Serverless架构向“会话原生”演进。阿里云函数计算引领云上 AI 应用 Serverless 运行时技术创新,实现性能、隔离与成本平衡,开启Serverless AI新范式。
450 12
|
8月前
|
SQL 分布式计算 Serverless
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
鹰角网络为应对游戏业务高频活动带来的数据潮汐、资源弹性及稳定性需求,采用阿里云 EMR Serverless Spark 替代原有架构。迁移后实现研发效率提升,支持业务快速发展、计算效率提升,增强SLA保障,稳定性提升,降低运维成本,并支撑全球化数据架构部署。
838 56
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用

热门文章

最新文章

相关产品

  • 物联网平台